This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 96eefca6d2f fix: elasticsearch / opensearch logging exception details
are missing in task log tab (#63739)
96eefca6d2f is described below
commit 96eefca6d2f59205df9b71f31e70cda7d73ffc0d
Author: Subham <[email protected]>
AuthorDate: Sun Mar 22 20:33:15 2026 +0530
fix: elasticsearch / opensearch logging exception details are missing in
task log tab (#63739)
---
.../src/airflow/example_dags/example_failed_dag.py | 39 ++++
.../test_remote_logging_elasticsearch.py | 42 ++++
.../providers/elasticsearch/log/es_task_handler.py | 57 ++++-
.../elasticsearch/log/test_es_remote_log_io.py | 33 ++-
.../unit/elasticsearch/log/test_es_task_handler.py | 256 +++++++++++++++++++++
.../providers/opensearch/log/os_task_handler.py | 54 ++++-
.../unit/opensearch/log/test_os_task_handler.py | 217 +++++++++++++++++
7 files changed, 686 insertions(+), 12 deletions(-)
diff --git a/airflow-core/src/airflow/example_dags/example_failed_dag.py
b/airflow-core/src/airflow/example_dags/example_failed_dag.py
new file mode 100644
index 00000000000..5b8cae60f27
--- /dev/null
+++ b/airflow-core/src/airflow/example_dags/example_failed_dag.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Example DAG demonstrating task failure to generate exception traces in
logs."""
+
+from __future__ import annotations
+
+import pendulum
+
+from airflow.sdk import DAG, task
+
+
+@task
+def fail_task():
+ """A task that always fails to generate error_detail."""
+ raise RuntimeError("This is a test exception for stacktrace rendering")
+
+
+with DAG(
+ "example_failed_dag",
+ schedule="@once",
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+ catchup=False,
+ tags=["example"],
+) as dag:
+ fail_task()
diff --git
a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py
index 5c07a04203a..c17f5109f80 100644
---
a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py
+++
b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py
@@ -101,3 +101,45 @@ class TestRemoteLoggingElasticsearch:
assert any(self.expected_message in event for event in events), (
f"Expected task logs to contain {self.expected_message!r}, got
events: {events}"
)
+
+ def test_remote_logging_elasticsearch_error_detail(self):
+ """Test that log error_detail is retrieved correctly from
Elasticsearch."""
+ dag_id = "example_failed_dag"
+ task_id = "fail_task"
+
+ self.airflow_client.un_pause_dag(dag_id)
+ resp = self.airflow_client.trigger_dag(
+ dag_id,
+ json={"logical_date": datetime.now(timezone.utc).isoformat()},
+ )
+ run_id = resp["dag_run_id"]
+ state = self.airflow_client.wait_for_dag_run(dag_id=dag_id,
run_id=run_id)
+
+ assert state == "failed"
+
+ # Logs might take some time to appear in ES
+ task_logs_content = []
+ for _ in range(self.max_retries):
+ task_logs_resp = self.airflow_client.get_task_logs(
+ dag_id=dag_id,
+ task_id=task_id,
+ run_id=run_id,
+ )
+ task_logs_content = task_logs_resp.get("content", [])
+ # Search for the log entry with error_detail
+ if any("error_detail" in item for item in task_logs_content if
isinstance(item, dict)):
+ break
+ time.sleep(self.retry_interval_in_seconds)
+
+ error_entries = [
+ item for item in task_logs_content if isinstance(item, dict) and
"error_detail" in item
+ ]
+ assert len(error_entries) > 0, (
+ f"Expected error_detail in logs, but none found. Logs:
{task_logs_content}"
+ )
+
+ error_detail = error_entries[0]["error_detail"]
+ assert isinstance(error_detail, list), f"Expected error_detail to be a
list, got {type(error_detail)}"
+ assert len(error_detail) > 0, "Expected error_detail to have at least
one exception"
+ assert error_detail[0]["exc_type"] == "RuntimeError"
+ assert "This is a test exception for stacktrace rendering" in
error_detail[0]["exc_value"]
diff --git
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
index 60080658ae9..f063749e523 100644
---
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
+++
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -79,7 +79,56 @@ LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""}
# not exist, the task handler should use the log_id_template attribute instead.
USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
-TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger"]
+TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger",
"error_detail", "message", "levelname"]
+
+
+def _format_error_detail(error_detail: Any) -> str | None:
+ """Render the structured ``error_detail`` written by the Airflow 3
supervisor as a traceback string."""
+ if not error_detail:
+ return None
+ if not isinstance(error_detail, list):
+ return str(error_detail)
+
+ lines: list[str] = ["Traceback (most recent call last):"]
+ for exc_info in error_detail:
+ if not isinstance(exc_info, dict):
+ lines.append(str(exc_info))
+ continue
+ if exc_info.get("is_cause"):
+ lines.append("\nThe above exception was the direct cause of the
following exception:\n")
+ lines.append("Traceback (most recent call last):")
+ for frame in exc_info.get("frames", []):
+ lines.append(
+ f' File "{frame.get("filename", "<unknown>")}", line
{frame.get("lineno", "?")}, in {frame.get("name", "<unknown>")}'
+ )
+ exc_type = exc_info.get("exc_type", "")
+ exc_value = exc_info.get("exc_value", "")
+ if exc_type:
+ lines.append(f"{exc_type}: {exc_value}" if exc_value else exc_type)
+ return "\n".join(lines)
+
+
+def _build_log_fields(hit_dict: dict[str, Any]) -> dict[str, Any]:
+ """Filter an ES hit to ``TASK_LOG_FIELDS`` and ensure compatibility with
StructuredLogMessage."""
+ fields = {k: v for k, v in hit_dict.items() if k.lower() in
TASK_LOG_FIELDS or k == "@timestamp"}
+
+ # Map @timestamp to timestamp
+ if "@timestamp" in fields and "timestamp" not in fields:
+ fields["timestamp"] = fields.pop("@timestamp")
+
+ # Map levelname to level
+ if "levelname" in fields and "level" not in fields:
+ fields["level"] = fields.pop("levelname")
+
+ # Airflow 3 StructuredLogMessage requires 'event'
+ if "event" not in fields:
+ fields["event"] = fields.pop("message", "")
+
+ # Clean up error_detail if it's empty
+ if "error_detail" in fields and not fields["error_detail"]:
+ fields.pop("error_detail")
+ return fields
+
VALID_ES_CONFIG_KEYS =
set(inspect.signature(elasticsearch.Elasticsearch.__init__).parameters.keys())
# Remove `self` from the valid set of kwargs
@@ -356,9 +405,7 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
# Flatten all hits, filter to only desired fields, and
construct StructuredLogMessage objects
message = header + [
- StructuredLogMessage(
- **{k: v for k, v in hit.to_dict().items() if k.lower()
in TASK_LOG_FIELDS}
- )
+ StructuredLogMessage(**_build_log_fields(hit.to_dict()))
for hits in logs_by_host.values()
for hit in hits
]
@@ -668,7 +715,7 @@ class ElasticsearchRemoteLogIO(LoggingMixin): # noqa: D101
# Structured log messages
for hits in logs_by_host.values():
for hit in hits:
- filtered = {k: v for k, v in hit.to_dict().items() if
k.lower() in TASK_LOG_FIELDS}
+ filtered = _build_log_fields(hit.to_dict())
message.append(json.dumps(filtered))
return header, message
diff --git
a/providers/elasticsearch/tests/integration/elasticsearch/log/test_es_remote_log_io.py
b/providers/elasticsearch/tests/integration/elasticsearch/log/test_es_remote_log_io.py
index aa8e91f812d..89a12a3a39b 100644
---
a/providers/elasticsearch/tests/integration/elasticsearch/log/test_es_remote_log_io.py
+++
b/providers/elasticsearch/tests/integration/elasticsearch/log/test_es_remote_log_io.py
@@ -24,7 +24,7 @@ from unittest.mock import patch
import elasticsearch
import pytest
-from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchRemoteLogIO
+from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchRemoteLogIO, _render_log_id
# The ES service hostname as defined in
scripts/ci/docker-compose/integration-elasticsearch.yml
ES_HOST = "http://elasticsearch:9200"
@@ -101,8 +101,8 @@ class TestElasticsearchRemoteLogIOIntegration:
expected_messages = ["start", "processing", "end"]
for expected, log_message in zip(expected_messages, log_messages):
log_entry = json.loads(log_message)
- assert "message" in log_entry
- assert log_entry["message"] == expected
+ assert "event" in log_entry
+ assert log_entry["event"] == expected
def test_read_missing_log(self, ti):
"""Verify that a missing log returns the expected error message.
@@ -118,3 +118,30 @@ class TestElasticsearchRemoteLogIOIntegration:
assert log_source_info == []
assert len(log_messages) == 1
assert "not found in Elasticsearch" in log_messages[0]
+
+ def test_read_error_detail_integration(self, ti):
+ """Verify that error_detail is correctly retrieved and formatted in
integration tests."""
+ # Manually index a log entry with error_detail
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/opt/airflow/dags/fail.py", "lineno":
13, "name": "log_and_raise"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie. Something went wrong.",
+ }
+ ]
+ body = {
+ "event": "Task failed with exception",
+ "log_id": _render_log_id(self.elasticsearch_io.log_id_template,
ti, ti.try_number),
+ "offset": 1,
+ "error_detail": error_detail,
+ }
+ self.elasticsearch_io.client.index(index=self.target_index, body=body)
+ self.elasticsearch_io.client.indices.refresh(index=self.target_index)
+
+ log_source_info, log_messages = self.elasticsearch_io.read("", ti)
+
+ assert len(log_messages) == 1
+ log_entry = json.loads(log_messages[0])
+ assert "error_detail" in log_entry
+ assert log_entry["error_detail"] == error_detail
diff --git
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
index 6698f27efa3..8d6a0ba616c 100644
---
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
+++
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
@@ -1060,3 +1060,259 @@ class TestElasticsearchRemoteLogIO:
assert log_source_info == []
assert f"*** Log {log_id} not found in Elasticsearch" in
log_messages[0]
mocked_count.assert_called_once()
+
+ def test_read_error_detail(self, ti):
+ """Verify that error_detail is correctly retrieved and formatted."""
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/opt/airflow/dags/fail.py", "lineno":
13, "name": "log_and_raise"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie. Something went wrong.",
+ }
+ ]
+ body = {
+ "event": "Task failed with exception",
+ "log_id": _render_log_id(self.elasticsearch_io.log_id_template,
ti, ti.try_number),
+ "offset": 1,
+ "error_detail": error_detail,
+ }
+
+ from airflow.providers.elasticsearch.log.es_response import Hit
+
+ mock_hit = Hit({"_source": body})
+ with (
+ patch.object(self.elasticsearch_io, "_es_read") as mock_es_read,
+ patch.object(
+ self.elasticsearch_io,
+ "_group_logs_by_host",
+ return_value={"http://localhost:9200": [mock_hit]},
+ ),
+ ):
+ mock_es_read.return_value = mock.MagicMock()
+ mock_es_read.return_value.hits = [mock_hit]
+
+ log_source_info, log_messages = self.elasticsearch_io.read("", ti)
+
+ assert len(log_messages) == 1
+ log_entry = json.loads(log_messages[0])
+ assert "error_detail" in log_entry
+ assert log_entry["error_detail"] == error_detail
+
+
+# ---------------------------------------------------------------------------
+# Tests for the error_detail helpers (issue #63736)
+# ---------------------------------------------------------------------------
+
+
+class TestFormatErrorDetail:
+ """Unit tests for _format_error_detail."""
+
+ def test_returns_none_for_empty(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ assert _format_error_detail(None) is None
+ assert _format_error_detail([]) is None
+
+ def test_returns_string_for_non_list(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ assert _format_error_detail("raw string") == "raw string"
+
+ def test_formats_single_exception(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [
+ {"filename": "/app/task.py", "lineno": 13, "name":
"log_and_raise"},
+ ],
+ "exc_type": "RuntimeError",
+ "exc_value": "Something went wrong.",
+ "exceptions": [],
+ "is_group": False,
+ }
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert "Traceback (most recent call last):" in result
+ assert 'File "/app/task.py", line 13, in log_and_raise' in result
+ assert "RuntimeError: Something went wrong." in result
+
+ def test_formats_chained_exceptions(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": True,
+ "frames": [{"filename": "/a.py", "lineno": 1, "name": "foo"}],
+ "exc_type": "ValueError",
+ "exc_value": "original",
+ "exceptions": [],
+ },
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/b.py", "lineno": 2, "name": "bar"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "wrapped",
+ "exceptions": [],
+ },
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert "direct cause" in result
+ assert "ValueError: original" in result
+ assert "RuntimeError: wrapped" in result
+
+ def test_exc_type_without_value(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [],
+ "exc_type": "StopIteration",
+ "exc_value": "",
+ }
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert result.endswith("StopIteration")
+
+ def test_non_dict_items_are_stringified(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ result = _format_error_detail(["unexpected string item"])
+ assert result is not None
+ assert "unexpected string item" in result
+
+
+class TestBuildStructuredLogFields:
+ """Unit tests for _build_log_fields."""
+
+ def test_filters_to_allowed_fields(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "hello", "level": "info", "unknown_field": "should be
dropped"}
+ result = _build_log_fields(hit)
+ assert "event" in result
+ assert "level" in result
+ assert "unknown_field" not in result
+
+ def test_message_mapped_to_event(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"message": "plain message", "timestamp": "2024-01-01T00:00:00Z"}
+ fields = _build_log_fields(hit)
+ assert fields["event"] == "plain message"
+ assert "message" not in fields # Ensure it is popped if used as event
+
+ def test_message_preserved_if_event_exists(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "structured event", "message": "plain message"}
+ fields = _build_log_fields(hit)
+ assert fields["event"] == "structured event"
+ # message is preserved if it's in TASK_LOG_FIELDS and doesn't collide
with event
+ assert fields["message"] == "plain message"
+
+ def test_levelname_mapped_to_level(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "levelname": "ERROR"}
+ result = _build_log_fields(hit)
+ assert result["level"] == "ERROR"
+ assert "levelname" not in result
+
+ def test_at_timestamp_mapped_to_timestamp(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "@timestamp": "2024-01-01T00:00:00Z"}
+ result = _build_log_fields(hit)
+ assert result["timestamp"] == "2024-01-01T00:00:00Z"
+ assert "@timestamp" not in result
+
+ def test_error_detail_is_kept_as_list(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/dag.py", "lineno": 10, "name":
"run"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie.",
+ }
+ ]
+ hit = {
+ "event": "Task failed with exception",
+ "error_detail": error_detail,
+ }
+ result = _build_log_fields(hit)
+ assert result["error_detail"] == error_detail
+
+ def test_error_detail_dropped_when_empty(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "error_detail": []}
+ result = _build_log_fields(hit)
+ assert "error_detail" not in result
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage
only exists in Airflow 3+")
+ @elasticmock
+ def test_read_includes_error_detail_in_structured_message(self):
+ """End-to-end: a hit with error_detail should surface it in the
returned StructuredLogMessage."""
+ from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchTaskHandler
+
+ local_log_location = "local/log/location"
+ handler = ElasticsearchTaskHandler(
+ base_log_folder=local_log_location,
+ end_of_log_mark="end_of_log\n",
+ write_stdout=False,
+ json_format=False,
+ json_fields="asctime,filename,lineno,levelname,message,exc_text",
+ )
+
+ es = elasticsearch.Elasticsearch("http://localhost:9200")
+ log_id = "test_dag-test_task-test_run--1-1"
+ body = {
+ "event": "Task failed with exception",
+ "log_id": log_id,
+ "offset": 1,
+ "error_detail": [
+ {
+ "is_cause": False,
+ "frames": [
+ {"filename": "/opt/airflow/dags/fail.py", "lineno":
13, "name": "log_and_raise"}
+ ],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie. Something went wrong.",
+ }
+ ],
+ }
+ es.index(index="test_index", doc_type="log", body=body, id=1)
+
+ # Patch the IO layer to return our fake document
+ mock_hit_dict = body.copy()
+
+ from airflow.providers.elasticsearch.log.es_response import
ElasticSearchResponse, Hit
+
+ mock_hit = Hit({"_source": mock_hit_dict})
+ mock_response = mock.MagicMock(spec=ElasticSearchResponse)
+ mock_response.hits = [mock_hit]
+ mock_response.__iter__ = mock.Mock(return_value=iter([mock_hit]))
+ mock_response.__bool__ = mock.Mock(return_value=True)
+ mock_response.__getitem__ = mock.Mock(return_value=mock_hit)
+
+ with mock.patch.object(handler.io, "_es_read",
return_value=mock_response):
+ with mock.patch.object(handler.io, "_group_logs_by_host",
return_value={"localhost": [mock_hit]}):
+ # Build StructuredLogMessages
+ from airflow.providers.elasticsearch.log.es_task_handler
import _build_log_fields
+ from airflow.utils.log.file_task_handler import
StructuredLogMessage
+
+ fields = _build_log_fields(mock_hit.to_dict())
+ msg = StructuredLogMessage(**fields)
+
+ assert msg.event == "Task failed with exception"
+ assert hasattr(msg, "error_detail")
+ assert msg.error_detail == body["error_detail"]
diff --git
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
index c76980e5106..05f0ff90cbf 100644
---
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
+++
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
@@ -58,7 +58,55 @@ else:
USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""}
-TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger"]
+TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger",
"error_detail", "message", "levelname"]
+
+
+def _format_error_detail(error_detail: Any) -> str | None:
+ """Render the structured ``error_detail`` written by the Airflow 3
supervisor as a traceback string."""
+ if not error_detail:
+ return None
+ if not isinstance(error_detail, list):
+ return str(error_detail)
+
+ lines: list[str] = ["Traceback (most recent call last):"]
+ for exc_info in error_detail:
+ if not isinstance(exc_info, dict):
+ lines.append(str(exc_info))
+ continue
+ if exc_info.get("is_cause"):
+ lines.append("\nThe above exception was the direct cause of the
following exception:\n")
+ lines.append("Traceback (most recent call last):")
+ for frame in exc_info.get("frames", []):
+ lines.append(
+ f' File "{frame.get("filename", "<unknown>")}", line
{frame.get("lineno", "?")}, in {frame.get("name", "<unknown>")}'
+ )
+ exc_type = exc_info.get("exc_type", "")
+ exc_value = exc_info.get("exc_value", "")
+ if exc_type:
+ lines.append(f"{exc_type}: {exc_value}" if exc_value else exc_type)
+ return "\n".join(lines)
+
+
+def _build_log_fields(hit_dict: dict[str, Any]) -> dict[str, Any]:
+ """Filter an OpenSearch hit to ``TASK_LOG_FIELDS`` and ensure
compatibility with StructuredLogMessage."""
+ fields = {k: v for k, v in hit_dict.items() if k.lower() in
TASK_LOG_FIELDS or k == "@timestamp"}
+
+ # Map @timestamp to timestamp
+ if "@timestamp" in fields and "timestamp" not in fields:
+ fields["timestamp"] = fields.pop("@timestamp")
+
+ # Map levelname to level
+ if "levelname" in fields and "level" not in fields:
+ fields["level"] = fields.pop("levelname")
+
+ # Airflow 3 StructuredLogMessage requires 'event'
+ if "event" not in fields:
+ fields["event"] = fields.pop("message", "")
+
+ # Clean up error_detail if it's empty
+ if "error_detail" in fields and not fields["error_detail"]:
+ fields.pop("error_detail")
+ return fields
def getattr_nested(obj, item, default):
@@ -416,9 +464,7 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
# Flatten all hits, filter to only desired fields, and
construct StructuredLogMessage objects
message = header + [
- StructuredLogMessage(
- **{k: v for k, v in hit.to_dict().items() if k.lower()
in TASK_LOG_FIELDS}
- )
+ StructuredLogMessage(**_build_log_fields(hit.to_dict()))
for hits in logs_by_host.values()
for hit in hits
]
diff --git
a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py
b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py
index 4dc46c1d89b..15aba25ae8b 100644
--- a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py
+++ b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py
@@ -568,3 +568,220 @@ def test_retrieve_config_keys():
# http_compress comes from config value
assert "http_compress" in args_from_config
assert "self" not in args_from_config
+
+
+# ---------------------------------------------------------------------------
+# Tests for the error_detail helpers (issue #63736)
+# ---------------------------------------------------------------------------
+
+
+class TestFormatErrorDetail:
+ """Unit tests for _format_error_detail."""
+
+ def test_returns_none_for_empty(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ assert _format_error_detail(None) is None
+ assert _format_error_detail([]) is None
+
+ def test_returns_string_for_non_list(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ assert _format_error_detail("raw string") == "raw string"
+
+ def test_formats_single_exception(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [
+ {"filename": "/app/task.py", "lineno": 13, "name":
"log_and_raise"},
+ ],
+ "exc_type": "RuntimeError",
+ "exc_value": "Something went wrong.",
+ "exceptions": [],
+ "is_group": False,
+ }
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert "Traceback (most recent call last):" in result
+ assert 'File "/app/task.py", line 13, in log_and_raise' in result
+ assert "RuntimeError: Something went wrong." in result
+
+ def test_formats_chained_exceptions(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": True,
+ "frames": [{"filename": "/a.py", "lineno": 1, "name": "foo"}],
+ "exc_type": "ValueError",
+ "exc_value": "original",
+ "exceptions": [],
+ },
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/b.py", "lineno": 2, "name": "bar"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "wrapped",
+ "exceptions": [],
+ },
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert "direct cause" in result
+ assert "ValueError: original" in result
+ assert "RuntimeError: wrapped" in result
+
+ def test_exc_type_without_value(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [],
+ "exc_type": "StopIteration",
+ "exc_value": "",
+ }
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert result.endswith("StopIteration")
+
+ def test_non_dict_items_are_stringified(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ result = _format_error_detail(["unexpected string item"])
+ assert result is not None
+ assert "unexpected string item" in result
+
+
+class TestBuildLogFields:
+ """Unit tests for _build_log_fields."""
+
+ def test_filters_to_allowed_fields(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "hello", "level": "info", "unknown_field": "should be
dropped"}
+ result = _build_log_fields(hit)
+ assert "event" in result
+ assert "level" in result
+ assert "unknown_field" not in result
+
+ def test_message_mapped_to_event(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"message": "plain message", "timestamp": "2024-01-01T00:00:00Z"}
+ fields = _build_log_fields(hit)
+ assert fields["event"] == "plain message"
+ assert "message" not in fields # Ensure it is popped if used as event
+
+ def test_message_preserved_if_event_exists(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "structured event", "message": "plain message"}
+ fields = _build_log_fields(hit)
+ assert fields["event"] == "structured event"
+ # message is preserved if it's in TASK_LOG_FIELDS and doesn't collide
with event
+ assert fields["message"] == "plain message"
+
+ def test_levelname_mapped_to_level(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "levelname": "ERROR"}
+ result = _build_log_fields(hit)
+ assert result["level"] == "ERROR"
+ assert "levelname" not in result
+
+ def test_at_timestamp_mapped_to_timestamp(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "@timestamp": "2024-01-01T00:00:00Z"}
+ result = _build_log_fields(hit)
+ assert result["timestamp"] == "2024-01-01T00:00:00Z"
+ assert "@timestamp" not in result
+
+ def test_error_detail_is_kept_as_list(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/dag.py", "lineno": 10, "name":
"run"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie.",
+ }
+ ]
+ hit = {
+ "event": "Task failed with exception",
+ "error_detail": error_detail,
+ }
+ result = _build_log_fields(hit)
+ assert result["error_detail"] == error_detail
+
+ def test_error_detail_dropped_when_empty(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "error_detail": []}
+ result = _build_log_fields(hit)
+ assert "error_detail" not in result
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage
only exists in Airflow 3+")
+ def test_read_includes_error_detail_in_structured_message(self):
+ """End-to-end: a hit with error_detail should surface it in the
returned StructuredLogMessage."""
+ from airflow.providers.opensearch.log.os_task_handler import
OpensearchTaskHandler
+
+ local_log_location = "local/log/location"
+ handler = OpensearchTaskHandler(
+ base_log_folder=local_log_location,
+ end_of_log_mark="end_of_log\n",
+ write_stdout=False,
+ json_format=False,
+ json_fields="asctime,filename,lineno,levelname,message,exc_text",
+ host="localhost",
+ port=9200,
+ username="admin",
+ password="password",
+ )
+
+ log_id = "test_dag-test_task-test_run--1-1"
+ body = {
+ "event": "Task failed with exception",
+ "log_id": log_id,
+ "offset": 1,
+ "error_detail": [
+ {
+ "is_cause": False,
+ "frames": [
+ {"filename": "/opt/airflow/dags/fail.py", "lineno":
13, "name": "log_and_raise"}
+ ],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie. Something went wrong.",
+ }
+ ],
+ }
+
+ # Instead of firing up an OpenSearch client, we patch the IO and
response class
+ mock_hit_dict = body.copy()
+ from airflow.providers.opensearch.log.os_response import Hit,
OpensearchResponse
+
+ mock_hit = Hit({"_source": mock_hit_dict})
+ mock_response = mock.MagicMock(spec=OpensearchResponse)
+ mock_response.hits = [mock_hit]
+ mock_response.__iter__ = mock.Mock(return_value=iter([mock_hit]))
+ mock_response.__bool__ = mock.Mock(return_value=True)
+ mock_response.__getitem__ = mock.Mock(return_value=mock_hit)
+
+ with mock.patch.object(handler, "_os_read",
return_value=mock_response):
+ with mock.patch.object(handler, "_group_logs_by_host",
return_value={"localhost": [mock_hit]}):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+ from airflow.utils.log.file_task_handler import
StructuredLogMessage
+
+ fields = _build_log_fields(mock_hit.to_dict())
+ msg = StructuredLogMessage(**fields)
+
+ assert msg.event == "Task failed with exception"
+ assert hasattr(msg, "error_detail")
+ assert msg.error_detail == body["error_detail"]