Copilot commented on code in PR #63739:
URL: https://github.com/apache/airflow/pull/63739#discussion_r2994964944
##########
providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py:
##########
@@ -1060,3 +1060,259 @@ def test_read_with_missing_log(self, mocked_count, ti):
assert log_source_info == []
assert f"*** Log {log_id} not found in Elasticsearch" in
log_messages[0]
mocked_count.assert_called_once()
+
+ def test_read_error_detail(self, ti):
+ """Verify that error_detail is correctly retrieved and formatted."""
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/opt/airflow/dags/fail.py", "lineno":
13, "name": "log_and_raise"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie. Something went wrong.",
+ }
+ ]
+ body = {
+ "event": "Task failed with exception",
+ "log_id": _render_log_id(self.elasticsearch_io.log_id_template,
ti, ti.try_number),
+ "offset": 1,
+ "error_detail": error_detail,
+ }
+
+ from airflow.providers.elasticsearch.log.es_response import Hit
+
+ mock_hit = Hit({"_source": body})
+ with (
+ patch.object(self.elasticsearch_io, "_es_read") as mock_es_read,
+ patch.object(
+ self.elasticsearch_io,
+ "_group_logs_by_host",
+ return_value={"http://localhost:9200": [mock_hit]},
+ ),
+ ):
+ mock_es_read.return_value = mock.MagicMock()
+ mock_es_read.return_value.hits = [mock_hit]
+
+ log_source_info, log_messages = self.elasticsearch_io.read("", ti)
+
+ assert len(log_messages) == 1
+ log_entry = json.loads(log_messages[0])
+ assert "error_detail" in log_entry
+ assert log_entry["error_detail"] == error_detail
+
+
+# ---------------------------------------------------------------------------
+# Tests for the error_detail helpers (issue #63736)
+# ---------------------------------------------------------------------------
+
+
+class TestFormatErrorDetail:
+ """Unit tests for _format_error_detail."""
+
+ def test_returns_none_for_empty(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ assert _format_error_detail(None) is None
+ assert _format_error_detail([]) is None
+
+ def test_returns_string_for_non_list(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ assert _format_error_detail("raw string") == "raw string"
+
+ def test_formats_single_exception(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [
+ {"filename": "/app/task.py", "lineno": 13, "name":
"log_and_raise"},
+ ],
+ "exc_type": "RuntimeError",
+ "exc_value": "Something went wrong.",
+ "exceptions": [],
+ "is_group": False,
+ }
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert "Traceback (most recent call last):" in result
+ assert 'File "/app/task.py", line 13, in log_and_raise' in result
+ assert "RuntimeError: Something went wrong." in result
+
+ def test_formats_chained_exceptions(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": True,
+ "frames": [{"filename": "/a.py", "lineno": 1, "name": "foo"}],
+ "exc_type": "ValueError",
+ "exc_value": "original",
+ "exceptions": [],
+ },
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/b.py", "lineno": 2, "name": "bar"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "wrapped",
+ "exceptions": [],
+ },
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert "direct cause" in result
+ assert "ValueError: original" in result
+ assert "RuntimeError: wrapped" in result
+
+ def test_exc_type_without_value(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [],
+ "exc_type": "StopIteration",
+ "exc_value": "",
+ }
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert result.endswith("StopIteration")
+
+ def test_non_dict_items_are_stringified(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ result = _format_error_detail(["unexpected string item"])
+ assert result is not None
+ assert "unexpected string item" in result
+
+
+class TestBuildStructuredLogFields:
+ """Unit tests for _build_log_fields."""
+
+ def test_filters_to_allowed_fields(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "hello", "level": "info", "unknown_field": "should be
dropped"}
+ result = _build_log_fields(hit)
+ assert "event" in result
+ assert "level" in result
+ assert "unknown_field" not in result
+
+ def test_message_mapped_to_event(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"message": "plain message", "timestamp": "2024-01-01T00:00:00Z"}
+ fields = _build_log_fields(hit)
+ assert fields["event"] == "plain message"
+ assert "message" not in fields # Ensure it is popped if used as event
+
+ def test_message_preserved_if_event_exists(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "structured event", "message": "plain message"}
+ fields = _build_log_fields(hit)
+ assert fields["event"] == "structured event"
+ # message is preserved if it's in TASK_LOG_FIELDS and doesn't collide
with event
+ assert fields["message"] == "plain message"
+
+ def test_levelname_mapped_to_level(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "levelname": "ERROR"}
+ result = _build_log_fields(hit)
+ assert result["level"] == "ERROR"
+ assert "levelname" not in result
+
+ def test_at_timestamp_mapped_to_timestamp(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "@timestamp": "2024-01-01T00:00:00Z"}
+ result = _build_log_fields(hit)
+ assert result["timestamp"] == "2024-01-01T00:00:00Z"
+ assert "@timestamp" not in result
+
+ def test_error_detail_is_kept_as_list(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/dag.py", "lineno": 10, "name":
"run"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie.",
+ }
+ ]
+ hit = {
+ "event": "Task failed with exception",
+ "error_detail": error_detail,
+ }
+ result = _build_log_fields(hit)
+ assert result["error_detail"] == error_detail
+
+ def test_error_detail_dropped_when_empty(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "error_detail": []}
+ result = _build_log_fields(hit)
+ assert "error_detail" not in result
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage
only exists in Airflow 3+")
+ @elasticmock
+ def test_read_includes_error_detail_in_structured_message(self):
+ """End-to-end: a hit with error_detail should surface it in the
returned StructuredLogMessage."""
+ from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchTaskHandler
+
Review Comment:
This test is labeled “end-to-end”, patches the IO layer, and even indexes a
document, but it never calls the handler’s read path; it only constructs a
`StructuredLogMessage` from `_build_log_fields`. Either exercise
`handler._read(...)`/`handler.read(...)` (so the test actually guards the
regression) or rename it to a pure unit test of
`_build_log_fields`/`StructuredLogMessage` construction.
##########
providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py:
##########
@@ -568,3 +568,220 @@ def test_retrieve_config_keys():
# http_compress comes from config value
assert "http_compress" in args_from_config
assert "self" not in args_from_config
+
+
+# ---------------------------------------------------------------------------
+# Tests for the error_detail helpers (issue #63736)
+# ---------------------------------------------------------------------------
+
+
+class TestFormatErrorDetail:
+ """Unit tests for _format_error_detail."""
+
+ def test_returns_none_for_empty(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ assert _format_error_detail(None) is None
+ assert _format_error_detail([]) is None
+
+ def test_returns_string_for_non_list(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ assert _format_error_detail("raw string") == "raw string"
+
+ def test_formats_single_exception(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [
+ {"filename": "/app/task.py", "lineno": 13, "name":
"log_and_raise"},
+ ],
+ "exc_type": "RuntimeError",
+ "exc_value": "Something went wrong.",
+ "exceptions": [],
+ "is_group": False,
+ }
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert "Traceback (most recent call last):" in result
+ assert 'File "/app/task.py", line 13, in log_and_raise' in result
+ assert "RuntimeError: Something went wrong." in result
+
+ def test_formats_chained_exceptions(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": True,
+ "frames": [{"filename": "/a.py", "lineno": 1, "name": "foo"}],
+ "exc_type": "ValueError",
+ "exc_value": "original",
+ "exceptions": [],
+ },
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/b.py", "lineno": 2, "name": "bar"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "wrapped",
+ "exceptions": [],
+ },
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert "direct cause" in result
+ assert "ValueError: original" in result
+ assert "RuntimeError: wrapped" in result
+
+ def test_exc_type_without_value(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [],
+ "exc_type": "StopIteration",
+ "exc_value": "",
+ }
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert result.endswith("StopIteration")
+
+ def test_non_dict_items_are_stringified(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ result = _format_error_detail(["unexpected string item"])
+ assert result is not None
+ assert "unexpected string item" in result
+
+
+class TestBuildLogFields:
+ """Unit tests for _build_log_fields."""
+
+ def test_filters_to_allowed_fields(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "hello", "level": "info", "unknown_field": "should be
dropped"}
+ result = _build_log_fields(hit)
+ assert "event" in result
+ assert "level" in result
+ assert "unknown_field" not in result
+
+ def test_message_mapped_to_event(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"message": "plain message", "timestamp": "2024-01-01T00:00:00Z"}
+ fields = _build_log_fields(hit)
+ assert fields["event"] == "plain message"
+ assert "message" not in fields # Ensure it is popped if used as event
+
+ def test_message_preserved_if_event_exists(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "structured event", "message": "plain message"}
+ fields = _build_log_fields(hit)
+ assert fields["event"] == "structured event"
+ # message is preserved if it's in TASK_LOG_FIELDS and doesn't collide
with event
+ assert fields["message"] == "plain message"
+
+ def test_levelname_mapped_to_level(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "levelname": "ERROR"}
+ result = _build_log_fields(hit)
+ assert result["level"] == "ERROR"
+ assert "levelname" not in result
+
+ def test_at_timestamp_mapped_to_timestamp(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "@timestamp": "2024-01-01T00:00:00Z"}
+ result = _build_log_fields(hit)
+ assert result["timestamp"] == "2024-01-01T00:00:00Z"
+ assert "@timestamp" not in result
+
+ def test_error_detail_is_kept_as_list(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/dag.py", "lineno": 10, "name":
"run"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie.",
+ }
+ ]
+ hit = {
+ "event": "Task failed with exception",
+ "error_detail": error_detail,
+ }
+ result = _build_log_fields(hit)
+ assert result["error_detail"] == error_detail
+
+ def test_error_detail_dropped_when_empty(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "error_detail": []}
+ result = _build_log_fields(hit)
+ assert "error_detail" not in result
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage
only exists in Airflow 3+")
+ def test_read_includes_error_detail_in_structured_message(self):
+ """End-to-end: a hit with error_detail should surface it in the
returned StructuredLogMessage."""
+ from airflow.providers.opensearch.log.os_task_handler import
OpensearchTaskHandler
+
+ local_log_location = "local/log/location"
+ handler = OpensearchTaskHandler(
+ base_log_folder=local_log_location,
+ end_of_log_mark="end_of_log\n",
+ write_stdout=False,
+ json_format=False,
+ json_fields="asctime,filename,lineno,levelname,message,exc_text",
+ host="localhost",
+ port=9200,
+ username="admin",
+ password="password",
+ )
+
+ log_id = "test_dag-test_task-test_run--1-1"
+ body = {
+ "event": "Task failed with exception",
+ "log_id": log_id,
+ "offset": 1,
+ "error_detail": [
+ {
+ "is_cause": False,
+ "frames": [
+ {"filename": "/opt/airflow/dags/fail.py", "lineno":
13, "name": "log_and_raise"}
+ ],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie. Something went wrong.",
+ }
+ ],
+ }
+
+ # Instead of firing up an OpenSearch client, we patch the IO and
response class
+ mock_hit_dict = body.copy()
+ from airflow.providers.opensearch.log.os_response import Hit,
OpensearchResponse
+
+ mock_hit = Hit({"_source": mock_hit_dict})
+ mock_response = mock.MagicMock(spec=OpensearchResponse)
+ mock_response.hits = [mock_hit]
+ mock_response.__iter__ = mock.Mock(return_value=iter([mock_hit]))
+ mock_response.__bool__ = mock.Mock(return_value=True)
+ mock_response.__getitem__ = mock.Mock(return_value=mock_hit)
Review Comment:
The test creates several `mock.Mock()` instances without a spec (e.g., for
`__iter__`, `__bool__`, `__getitem__`). Unspec'd mocks can silently accept
incorrect attribute usage and make the test less meaningful; prefer using
`MagicMock(spec=...)`/`create_autospec(...)` or configuring these behaviors on
the existing `MagicMock(spec=OpensearchResponse)` instance without introducing
unspec'd mocks.
```suggestion
mock_response.__iter__.return_value = iter([mock_hit])
mock_response.__bool__.return_value = True
mock_response.__getitem__.return_value = mock_hit
```
##########
airflow-core/src/airflow/example_dags/example_failed_dag.py:
##########
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Example DAG demonstrating task failure to generate exception traces in
logs."""
+
+from __future__ import annotations
+
+import pendulum
+
+from airflow.sdk import DAG, task
+
+
+@task
+def fail_task():
+ """A task that always fails to generate error_detail."""
+ raise RuntimeError("This is a test exception for stacktrace rendering")
+
+
+with DAG(
+ "example_failed_dag",
+ schedule="@once",
Review Comment:
This example DAG is designed to always fail, but it is scheduled as `@once`.
In environments where example DAGs are enabled, unpausing it can immediately
create a scheduled failing run and add noise. Consider making it manual-only
(e.g., `schedule=None`) and relying on the test to trigger it explicitly, or
otherwise ensuring it won’t auto-run by default.
```suggestion
schedule=None,
```
##########
providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py:
##########
@@ -1060,3 +1060,259 @@ def test_read_with_missing_log(self, mocked_count, ti):
assert log_source_info == []
assert f"*** Log {log_id} not found in Elasticsearch" in
log_messages[0]
mocked_count.assert_called_once()
+
+ def test_read_error_detail(self, ti):
+ """Verify that error_detail is correctly retrieved and formatted."""
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/opt/airflow/dags/fail.py", "lineno":
13, "name": "log_and_raise"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie. Something went wrong.",
+ }
+ ]
+ body = {
+ "event": "Task failed with exception",
+ "log_id": _render_log_id(self.elasticsearch_io.log_id_template,
ti, ti.try_number),
+ "offset": 1,
+ "error_detail": error_detail,
+ }
+
+ from airflow.providers.elasticsearch.log.es_response import Hit
+
+ mock_hit = Hit({"_source": body})
+ with (
+ patch.object(self.elasticsearch_io, "_es_read") as mock_es_read,
+ patch.object(
+ self.elasticsearch_io,
+ "_group_logs_by_host",
+ return_value={"http://localhost:9200": [mock_hit]},
+ ),
+ ):
+ mock_es_read.return_value = mock.MagicMock()
+ mock_es_read.return_value.hits = [mock_hit]
+
+ log_source_info, log_messages = self.elasticsearch_io.read("", ti)
+
+ assert len(log_messages) == 1
+ log_entry = json.loads(log_messages[0])
+ assert "error_detail" in log_entry
+ assert log_entry["error_detail"] == error_detail
+
+
+# ---------------------------------------------------------------------------
+# Tests for the error_detail helpers (issue #63736)
+# ---------------------------------------------------------------------------
+
+
+class TestFormatErrorDetail:
+ """Unit tests for _format_error_detail."""
+
+ def test_returns_none_for_empty(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ assert _format_error_detail(None) is None
+ assert _format_error_detail([]) is None
+
+ def test_returns_string_for_non_list(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ assert _format_error_detail("raw string") == "raw string"
+
+ def test_formats_single_exception(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [
+ {"filename": "/app/task.py", "lineno": 13, "name":
"log_and_raise"},
+ ],
+ "exc_type": "RuntimeError",
+ "exc_value": "Something went wrong.",
+ "exceptions": [],
+ "is_group": False,
+ }
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert "Traceback (most recent call last):" in result
+ assert 'File "/app/task.py", line 13, in log_and_raise' in result
+ assert "RuntimeError: Something went wrong." in result
+
+ def test_formats_chained_exceptions(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": True,
+ "frames": [{"filename": "/a.py", "lineno": 1, "name": "foo"}],
+ "exc_type": "ValueError",
+ "exc_value": "original",
+ "exceptions": [],
+ },
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/b.py", "lineno": 2, "name": "bar"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "wrapped",
+ "exceptions": [],
+ },
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert "direct cause" in result
+ assert "ValueError: original" in result
+ assert "RuntimeError: wrapped" in result
+
+ def test_exc_type_without_value(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [],
+ "exc_type": "StopIteration",
+ "exc_value": "",
+ }
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert result.endswith("StopIteration")
+
+ def test_non_dict_items_are_stringified(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+
+ result = _format_error_detail(["unexpected string item"])
+ assert result is not None
+ assert "unexpected string item" in result
+
+
+class TestBuildStructuredLogFields:
+ """Unit tests for _build_log_fields."""
+
+ def test_filters_to_allowed_fields(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "hello", "level": "info", "unknown_field": "should be
dropped"}
+ result = _build_log_fields(hit)
+ assert "event" in result
+ assert "level" in result
+ assert "unknown_field" not in result
+
+ def test_message_mapped_to_event(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"message": "plain message", "timestamp": "2024-01-01T00:00:00Z"}
+ fields = _build_log_fields(hit)
+ assert fields["event"] == "plain message"
+ assert "message" not in fields # Ensure it is popped if used as event
+
+ def test_message_preserved_if_event_exists(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "structured event", "message": "plain message"}
+ fields = _build_log_fields(hit)
+ assert fields["event"] == "structured event"
+ # message is preserved if it's in TASK_LOG_FIELDS and doesn't collide
with event
+ assert fields["message"] == "plain message"
+
+ def test_levelname_mapped_to_level(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "levelname": "ERROR"}
+ result = _build_log_fields(hit)
+ assert result["level"] == "ERROR"
+ assert "levelname" not in result
+
+ def test_at_timestamp_mapped_to_timestamp(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "@timestamp": "2024-01-01T00:00:00Z"}
+ result = _build_log_fields(hit)
+ assert result["timestamp"] == "2024-01-01T00:00:00Z"
+ assert "@timestamp" not in result
+
+ def test_error_detail_is_kept_as_list(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/dag.py", "lineno": 10, "name":
"run"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie.",
+ }
+ ]
+ hit = {
+ "event": "Task failed with exception",
+ "error_detail": error_detail,
+ }
+ result = _build_log_fields(hit)
+ assert result["error_detail"] == error_detail
+
+ def test_error_detail_dropped_when_empty(self):
+ from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "error_detail": []}
+ result = _build_log_fields(hit)
+ assert "error_detail" not in result
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage
only exists in Airflow 3+")
+ @elasticmock
+ def test_read_includes_error_detail_in_structured_message(self):
+ """End-to-end: a hit with error_detail should surface it in the
returned StructuredLogMessage."""
+ from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchTaskHandler
+
+ local_log_location = "local/log/location"
+ handler = ElasticsearchTaskHandler(
+ base_log_folder=local_log_location,
+ end_of_log_mark="end_of_log\n",
+ write_stdout=False,
+ json_format=False,
+ json_fields="asctime,filename,lineno,levelname,message,exc_text",
+ )
+
+ es = elasticsearch.Elasticsearch("http://localhost:9200")
+ log_id = "test_dag-test_task-test_run--1-1"
+ body = {
+ "event": "Task failed with exception",
+ "log_id": log_id,
+ "offset": 1,
+ "error_detail": [
+ {
+ "is_cause": False,
+ "frames": [
+ {"filename": "/opt/airflow/dags/fail.py", "lineno":
13, "name": "log_and_raise"}
+ ],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie. Something went wrong.",
+ }
+ ],
+ }
+ es.index(index="test_index", doc_type="log", body=body, id=1)
+
+ # Patch the IO layer to return our fake document
+ mock_hit_dict = body.copy()
+
+ from airflow.providers.elasticsearch.log.es_response import
ElasticSearchResponse, Hit
+
+ mock_hit = Hit({"_source": mock_hit_dict})
+ mock_response = mock.MagicMock(spec=ElasticSearchResponse)
+ mock_response.hits = [mock_hit]
+ mock_response.__iter__ = mock.Mock(return_value=iter([mock_hit]))
+ mock_response.__bool__ = mock.Mock(return_value=True)
+ mock_response.__getitem__ = mock.Mock(return_value=mock_hit)
Review Comment:
This test uses several `mock.Mock()` assignments for magic methods
(`__iter__`, `__bool__`, `__getitem__`) without a spec. To avoid mocks that
accept any attribute access, prefer `create_autospec`/`MagicMock(spec=...)` (or
configure iteration/boolean behavior directly on the existing
`MagicMock(spec=ElasticSearchResponse)`).
```suggestion
mock_response.__iter__.return_value = iter([mock_hit])
mock_response.__bool__.return_value = True
mock_response.__getitem__.return_value = mock_hit
```
##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -79,7 +79,56 @@
# not exist, the task handler should use the log_id_template attribute instead.
USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
-TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger"]
+TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger",
"error_detail", "message", "levelname"]
+
+
+def _format_error_detail(error_detail: Any) -> str | None:
+ """Render the structured ``error_detail`` written by the Airflow 3
supervisor as a traceback string."""
+ if not error_detail:
+ return None
+ if not isinstance(error_detail, list):
+ return str(error_detail)
Review Comment:
`_format_error_detail` is introduced here but is not used anywhere in the
Elasticsearch task handler (only referenced by tests). This means the PR
currently only surfaces `error_detail` as structured data, but never renders it
into a traceback string as the description implies; either integrate this
formatting into the structured output or drop the unused helper/tests.
##########
providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py:
##########
@@ -1060,3 +1060,259 @@ def test_read_with_missing_log(self, mocked_count, ti):
assert log_source_info == []
assert f"*** Log {log_id} not found in Elasticsearch" in
log_messages[0]
mocked_count.assert_called_once()
+
+ def test_read_error_detail(self, ti):
+ """Verify that error_detail is correctly retrieved and formatted."""
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/opt/airflow/dags/fail.py", "lineno":
13, "name": "log_and_raise"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie. Something went wrong.",
+ }
+ ]
+ body = {
+ "event": "Task failed with exception",
+ "log_id": _render_log_id(self.elasticsearch_io.log_id_template,
ti, ti.try_number),
+ "offset": 1,
+ "error_detail": error_detail,
+ }
+
+ from airflow.providers.elasticsearch.log.es_response import Hit
+
+ mock_hit = Hit({"_source": body})
+ with (
+ patch.object(self.elasticsearch_io, "_es_read") as mock_es_read,
+ patch.object(
+ self.elasticsearch_io,
+ "_group_logs_by_host",
+ return_value={"http://localhost:9200": [mock_hit]},
+ ),
+ ):
+ mock_es_read.return_value = mock.MagicMock()
Review Comment:
`mock_es_read.return_value` is set to `mock.MagicMock()` without a spec,
which can hide attribute/behavior mismatches in the test. Prefer
`MagicMock(spec=ElasticSearchResponse)` (or `create_autospec`) and configure
only the attributes used (`hits`, iteration, etc.).
```suggestion
mock_es_read.return_value =
mock.MagicMock(spec=ElasticSearchResponse)
```
##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -58,7 +58,55 @@
USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""}
-TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger"]
+TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger",
"error_detail", "message", "levelname"]
+
+
+def _format_error_detail(error_detail: Any) -> str | None:
+ """Render the structured ``error_detail`` written by the Airflow 3
supervisor as a traceback string."""
+ if not error_detail:
+ return None
+ if not isinstance(error_detail, list):
+ return str(error_detail)
Review Comment:
`_format_error_detail` is introduced here but is not called anywhere in the
OpenSearch task handler (only referenced by tests). As a result, the structured
`error_detail` is never rendered into a human-readable traceback string as
described in the PR; either wire this into the structured log output (e.g.,
populate an `exc_text`/similar field or append to `event`) or remove the unused
helper/tests to avoid dead code.
##########
providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py:
##########
@@ -568,3 +568,220 @@ def test_retrieve_config_keys():
# http_compress comes from config value
assert "http_compress" in args_from_config
assert "self" not in args_from_config
+
+
+# ---------------------------------------------------------------------------
+# Tests for the error_detail helpers (issue #63736)
+# ---------------------------------------------------------------------------
+
+
+class TestFormatErrorDetail:
+ """Unit tests for _format_error_detail."""
+
+ def test_returns_none_for_empty(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ assert _format_error_detail(None) is None
+ assert _format_error_detail([]) is None
+
+ def test_returns_string_for_non_list(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ assert _format_error_detail("raw string") == "raw string"
+
+ def test_formats_single_exception(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [
+ {"filename": "/app/task.py", "lineno": 13, "name":
"log_and_raise"},
+ ],
+ "exc_type": "RuntimeError",
+ "exc_value": "Something went wrong.",
+ "exceptions": [],
+ "is_group": False,
+ }
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert "Traceback (most recent call last):" in result
+ assert 'File "/app/task.py", line 13, in log_and_raise' in result
+ assert "RuntimeError: Something went wrong." in result
+
+ def test_formats_chained_exceptions(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": True,
+ "frames": [{"filename": "/a.py", "lineno": 1, "name": "foo"}],
+ "exc_type": "ValueError",
+ "exc_value": "original",
+ "exceptions": [],
+ },
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/b.py", "lineno": 2, "name": "bar"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "wrapped",
+ "exceptions": [],
+ },
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert "direct cause" in result
+ assert "ValueError: original" in result
+ assert "RuntimeError: wrapped" in result
+
+ def test_exc_type_without_value(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [],
+ "exc_type": "StopIteration",
+ "exc_value": "",
+ }
+ ]
+ result = _format_error_detail(error_detail)
+ assert result is not None
+ assert result.endswith("StopIteration")
+
+ def test_non_dict_items_are_stringified(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+
+ result = _format_error_detail(["unexpected string item"])
+ assert result is not None
+ assert "unexpected string item" in result
+
+
+class TestBuildLogFields:
+ """Unit tests for _build_log_fields."""
+
+ def test_filters_to_allowed_fields(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "hello", "level": "info", "unknown_field": "should be
dropped"}
+ result = _build_log_fields(hit)
+ assert "event" in result
+ assert "level" in result
+ assert "unknown_field" not in result
+
+ def test_message_mapped_to_event(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"message": "plain message", "timestamp": "2024-01-01T00:00:00Z"}
+ fields = _build_log_fields(hit)
+ assert fields["event"] == "plain message"
+ assert "message" not in fields # Ensure it is popped if used as event
+
+ def test_message_preserved_if_event_exists(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "structured event", "message": "plain message"}
+ fields = _build_log_fields(hit)
+ assert fields["event"] == "structured event"
+ # message is preserved if it's in TASK_LOG_FIELDS and doesn't collide
with event
+ assert fields["message"] == "plain message"
+
+ def test_levelname_mapped_to_level(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "levelname": "ERROR"}
+ result = _build_log_fields(hit)
+ assert result["level"] == "ERROR"
+ assert "levelname" not in result
+
+ def test_at_timestamp_mapped_to_timestamp(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "@timestamp": "2024-01-01T00:00:00Z"}
+ result = _build_log_fields(hit)
+ assert result["timestamp"] == "2024-01-01T00:00:00Z"
+ assert "@timestamp" not in result
+
+ def test_error_detail_is_kept_as_list(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/dag.py", "lineno": 10, "name":
"run"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie.",
+ }
+ ]
+ hit = {
+ "event": "Task failed with exception",
+ "error_detail": error_detail,
+ }
+ result = _build_log_fields(hit)
+ assert result["error_detail"] == error_detail
+
+ def test_error_detail_dropped_when_empty(self):
+ from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
+
+ hit = {"event": "msg", "error_detail": []}
+ result = _build_log_fields(hit)
+ assert "error_detail" not in result
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage
only exists in Airflow 3+")
+ def test_read_includes_error_detail_in_structured_message(self):
+ """End-to-end: a hit with error_detail should surface it in the
returned StructuredLogMessage."""
+ from airflow.providers.opensearch.log.os_task_handler import
OpensearchTaskHandler
Review Comment:
This test is named/documented as “end-to-end” and patches
`_os_read`/`_group_logs_by_host`, but it never calls the handler’s read path
(it only builds a `StructuredLogMessage` directly from `_build_log_fields`).
Either call `handler._read(...)`/`handler.read(...)` to exercise the
integration, or rename/simplify the test to reflect what it actually validates.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]