This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 38bd24b7f99 ElasticsearchTaskHandler: Request only required source 
fields for task logs (#64562)
38bd24b7f99 is described below

commit 38bd24b7f9998c98da73869f0ede0eb35ea874d9
Author: Jorge Rocamora <[email protected]>
AuthorDate: Mon Apr 6 21:50:47 2026 +0200

    ElasticsearchTaskHandler: Request only required source fields for task logs 
(#64562)
---
 .../providers/elasticsearch/log/es_task_handler.py | 34 +++++++++++++--
 .../unit/elasticsearch/log/test_es_task_handler.py | 48 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 4 deletions(-)

diff --git 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
index 4df2b0ddd7d..754974a76ce 100644
--- 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
+++ 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -26,7 +26,7 @@ import shutil
 import sys
 import time
 from collections import defaultdict
-from collections.abc import Callable
+from collections.abc import Callable, Iterable
 from operator import attrgetter
 from pathlib import Path
 from typing import TYPE_CHECKING, Any, Literal, cast
@@ -82,6 +82,11 @@ USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
 TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger", 
"error_detail", "message", "levelname"]
 
 
+def _deduplicate_fields(fields: Iterable[str]) -> list[str]:
+    """Return non-empty field names in order without duplicates."""
+    return list(dict.fromkeys(field for field in fields if field))
+
+
 def _format_error_detail(error_detail: Any) -> str | None:
     """Render the structured ``error_detail`` written by the Airflow 3 
supervisor as a traceback string."""
     if not error_detail:
@@ -318,6 +323,10 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
     def _read_grouped_logs(self):
         return True
 
+    def _get_es_source_includes(self) -> list[str]:
+        extra_fields = self.json_fields if self.json_format and not 
AIRFLOW_V_3_0_PLUS else []
+        return self.io._get_source_includes(extra_fields=extra_fields)
+
     def _read(
         self, ti: TaskInstance, try_number: int, metadata: LogMetadata | None 
= None
     ) -> tuple[EsLogMsgType, LogMetadata]:
@@ -339,7 +348,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
 
         offset = metadata["offset"]
         log_id = _render_log_id(self.log_id_template, ti, try_number)
-        response = self.io._es_read(log_id, offset, ti)
+        response = self.io._es_read(log_id, offset, ti, 
source_includes=self._get_es_source_includes())
         # TODO: Can we skip group logs by host ?
         if response is not None and response.hits:
             logs_by_host = self.io._group_logs_by_host(response)
@@ -629,6 +638,12 @@ class ElasticsearchRemoteLogIO(LoggingMixin):  # noqa: D101
         self._doc_type_map: dict[Any, Any] = {}
         self._doc_type: list[Any] = []
 
+    def _get_source_includes(self, extra_fields: Iterable[str] = ()) -> 
list[str]:
+        """Return the Elasticsearch source fields to include when reading task 
logs."""
+        return _deduplicate_fields(
+            ["@timestamp", *TASK_LOG_FIELDS, self.host_field, 
self.offset_field, *extra_fields]
+        )
+
     def upload(self, path: os.PathLike | str, ti: RuntimeTI):
         """Write the log to ElasticSearch."""
         path = Path(path)
@@ -693,7 +708,7 @@ class ElasticsearchRemoteLogIO(LoggingMixin):  # noqa: D101
         log_id = _render_log_id(self.log_id_template, ti, ti.try_number)  # 
type: ignore[arg-type]
         self.log.info("Reading log %s from Elasticsearch", log_id)
         offset = 0
-        response = self._es_read(log_id, offset, ti)
+        response = self._es_read(log_id, offset, ti, 
source_includes=self._get_source_includes())
         if response is not None and response.hits:
             logs_by_host = self._group_logs_by_host(response)
         else:
@@ -720,7 +735,14 @@ class ElasticsearchRemoteLogIO(LoggingMixin):  # noqa: D101
 
         return header, message
 
-    def _es_read(self, log_id: str, offset: int | str, ti: RuntimeTI) -> 
ElasticSearchResponse | None:
+    def _es_read(
+        self,
+        log_id: str,
+        offset: int | str,
+        ti: RuntimeTI,
+        *,
+        source_includes: Iterable[str] | None = None,
+    ) -> ElasticSearchResponse | None:
         """
         Return the logs matching log_id in Elasticsearch and next offset or ''.
 
@@ -730,6 +752,9 @@ class ElasticsearchRemoteLogIO(LoggingMixin):  # noqa: D101
 
         :meta private:
         """
+        source_includes = _deduplicate_fields(
+            self._get_source_includes() if source_includes is None else 
source_includes
+        )
         query: dict[Any, Any] = {
             "bool": {
                 "filter": [{"range": {self.offset_field: {"gt": 
int(offset)}}}],
@@ -745,6 +770,7 @@ class ElasticsearchRemoteLogIO(LoggingMixin):  # noqa: D101
                 sort=[self.offset_field],
                 size=self.MAX_LINE_PER_PAGE,
                 from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+                source_includes=source_includes,
             )
         except NotFoundError:
             self.log.exception("The target index pattern %s does not exist", 
index_patterns)
diff --git 
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py 
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
index c69b6f3b36b..0ab4a8e2ed4 100644
--- 
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
+++ 
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
@@ -35,6 +35,7 @@ from airflow.providers.common.compat.sdk import conf
 from airflow.providers.elasticsearch.log.es_json_formatter import 
ElasticsearchJSONFormatter
 from airflow.providers.elasticsearch.log.es_response import 
ElasticSearchResponse
 from airflow.providers.elasticsearch.log.es_task_handler import (
+    TASK_LOG_FIELDS,
     VALID_ES_CONFIG_KEYS,
     ElasticsearchRemoteLogIO,
     ElasticsearchTaskHandler,
@@ -538,6 +539,23 @@ class TestElasticsearchTaskHandler:
             filename_template=None,
         )
 
+    def 
test_get_es_source_includes_legacy_json_format_includes_json_fields(self):
+        self.es_task_handler.json_format = True
+
+        with 
patch("airflow.providers.elasticsearch.log.es_task_handler.AIRFLOW_V_3_0_PLUS", 
False):
+            fields = self.es_task_handler._get_es_source_includes()
+
+        assert fields == [
+            "@timestamp",
+            *TASK_LOG_FIELDS,
+            self.host_field,
+            self.offset_field,
+            "asctime",
+            "filename",
+            "lineno",
+            "exc_text",
+        ]
+
 
 class TestTaskHandlerHelpers:
     def test_safe_attrgetter(self):
@@ -634,6 +652,30 @@ class TestElasticsearchRemoteLogIO:
         assert [line["offset"] for line in json_log_lines] == [1, 2, 3]
         assert all(line["log_id"] == log_id for line in json_log_lines)
 
+    def test_get_source_includes(self):
+        assert self.elasticsearch_io._get_source_includes() == [
+            "@timestamp",
+            *TASK_LOG_FIELDS,
+            self.elasticsearch_io.host_field,
+            self.elasticsearch_io.offset_field,
+        ]
+
+    def test_get_source_includes_with_custom_host_and_offset_fields(self):
+        self.elasticsearch_io.host_field = "host.name"
+        self.elasticsearch_io.offset_field = "log.offset"
+
+        assert self.elasticsearch_io._get_source_includes() == [
+            "@timestamp",
+            *TASK_LOG_FIELDS,
+            "host.name",
+            "log.offset",
+        ]
+
+    def 
test_get_source_includes_deduplicates_when_offset_overlaps_task_fields(self):
+        self.elasticsearch_io.offset_field = "timestamp"  # already in 
TASK_LOG_FIELDS
+        fields = self.elasticsearch_io._get_source_includes()
+        assert fields.count("timestamp") == 1
+
     def test_es_read_builds_expected_query(self, ti):
         self.elasticsearch_io.client = Mock()
         self.elasticsearch_io.client.search.return_value = 
_build_es_search_response(
@@ -661,6 +703,12 @@ class TestElasticsearchRemoteLogIO:
             sort=[self.elasticsearch_io.offset_field],
             size=self.elasticsearch_io.MAX_LINE_PER_PAGE,
             from_=0,
+            source_includes=[
+                "@timestamp",
+                *TASK_LOG_FIELDS,
+                self.elasticsearch_io.host_field,
+                self.elasticsearch_io.offset_field,
+            ],
         )
         assert response is not None
         assert response.hits[0].event == "hello"

Reply via email to