This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 38bd24b7f99 ElasticsearchTaskHandler: Request only required source
fields for task logs (#64562)
38bd24b7f99 is described below
commit 38bd24b7f9998c98da73869f0ede0eb35ea874d9
Author: Jorge Rocamora <[email protected]>
AuthorDate: Mon Apr 6 21:50:47 2026 +0200
ElasticsearchTaskHandler: Request only required source fields for task logs
(#64562)
---
.../providers/elasticsearch/log/es_task_handler.py | 34 +++++++++++++--
.../unit/elasticsearch/log/test_es_task_handler.py | 48 ++++++++++++++++++++++
2 files changed, 78 insertions(+), 4 deletions(-)
diff --git
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
index 4df2b0ddd7d..754974a76ce 100644
---
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
+++
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -26,7 +26,7 @@ import shutil
import sys
import time
from collections import defaultdict
-from collections.abc import Callable
+from collections.abc import Callable, Iterable
from operator import attrgetter
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, cast
@@ -82,6 +82,11 @@ USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger",
"error_detail", "message", "levelname"]
+def _deduplicate_fields(fields: Iterable[str]) -> list[str]:
+ """Return non-empty field names in order without duplicates."""
+ return list(dict.fromkeys(field for field in fields if field))
+
+
def _format_error_detail(error_detail: Any) -> str | None:
"""Render the structured ``error_detail`` written by the Airflow 3
supervisor as a traceback string."""
if not error_detail:
@@ -318,6 +323,10 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
def _read_grouped_logs(self):
return True
+ def _get_es_source_includes(self) -> list[str]:
+ extra_fields = self.json_fields if self.json_format and not
AIRFLOW_V_3_0_PLUS else []
+ return self.io._get_source_includes(extra_fields=extra_fields)
+
def _read(
self, ti: TaskInstance, try_number: int, metadata: LogMetadata | None
= None
) -> tuple[EsLogMsgType, LogMetadata]:
@@ -339,7 +348,7 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
offset = metadata["offset"]
log_id = _render_log_id(self.log_id_template, ti, try_number)
- response = self.io._es_read(log_id, offset, ti)
+ response = self.io._es_read(log_id, offset, ti,
source_includes=self._get_es_source_includes())
# TODO: Can we skip group logs by host ?
if response is not None and response.hits:
logs_by_host = self.io._group_logs_by_host(response)
@@ -629,6 +638,12 @@ class ElasticsearchRemoteLogIO(LoggingMixin): # noqa: D101
self._doc_type_map: dict[Any, Any] = {}
self._doc_type: list[Any] = []
+ def _get_source_includes(self, extra_fields: Iterable[str] = ()) ->
list[str]:
+ """Return the Elasticsearch source fields to include when reading task
logs."""
+ return _deduplicate_fields(
+ ["@timestamp", *TASK_LOG_FIELDS, self.host_field,
self.offset_field, *extra_fields]
+ )
+
def upload(self, path: os.PathLike | str, ti: RuntimeTI):
"""Write the log to ElasticSearch."""
path = Path(path)
@@ -693,7 +708,7 @@ class ElasticsearchRemoteLogIO(LoggingMixin): # noqa: D101
log_id = _render_log_id(self.log_id_template, ti, ti.try_number) #
type: ignore[arg-type]
self.log.info("Reading log %s from Elasticsearch", log_id)
offset = 0
- response = self._es_read(log_id, offset, ti)
+ response = self._es_read(log_id, offset, ti,
source_includes=self._get_source_includes())
if response is not None and response.hits:
logs_by_host = self._group_logs_by_host(response)
else:
@@ -720,7 +735,14 @@ class ElasticsearchRemoteLogIO(LoggingMixin): # noqa: D101
return header, message
- def _es_read(self, log_id: str, offset: int | str, ti: RuntimeTI) ->
ElasticSearchResponse | None:
+ def _es_read(
+ self,
+ log_id: str,
+ offset: int | str,
+ ti: RuntimeTI,
+ *,
+ source_includes: Iterable[str] | None = None,
+ ) -> ElasticSearchResponse | None:
"""
Return the logs matching log_id in Elasticsearch and next offset or ''.
@@ -730,6 +752,9 @@ class ElasticsearchRemoteLogIO(LoggingMixin): # noqa: D101
:meta private:
"""
+ source_includes = _deduplicate_fields(
+ self._get_source_includes() if source_includes is None else
source_includes
+ )
query: dict[Any, Any] = {
"bool": {
"filter": [{"range": {self.offset_field: {"gt":
int(offset)}}}],
@@ -745,6 +770,7 @@ class ElasticsearchRemoteLogIO(LoggingMixin): # noqa: D101
sort=[self.offset_field],
size=self.MAX_LINE_PER_PAGE,
from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+ source_includes=source_includes,
)
except NotFoundError:
self.log.exception("The target index pattern %s does not exist",
index_patterns)
diff --git
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
index c69b6f3b36b..0ab4a8e2ed4 100644
---
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
+++
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
@@ -35,6 +35,7 @@ from airflow.providers.common.compat.sdk import conf
from airflow.providers.elasticsearch.log.es_json_formatter import
ElasticsearchJSONFormatter
from airflow.providers.elasticsearch.log.es_response import
ElasticSearchResponse
from airflow.providers.elasticsearch.log.es_task_handler import (
+ TASK_LOG_FIELDS,
VALID_ES_CONFIG_KEYS,
ElasticsearchRemoteLogIO,
ElasticsearchTaskHandler,
@@ -538,6 +539,23 @@ class TestElasticsearchTaskHandler:
filename_template=None,
)
+ def
test_get_es_source_includes_legacy_json_format_includes_json_fields(self):
+ self.es_task_handler.json_format = True
+
+ with
patch("airflow.providers.elasticsearch.log.es_task_handler.AIRFLOW_V_3_0_PLUS",
False):
+ fields = self.es_task_handler._get_es_source_includes()
+
+ assert fields == [
+ "@timestamp",
+ *TASK_LOG_FIELDS,
+ self.host_field,
+ self.offset_field,
+ "asctime",
+ "filename",
+ "lineno",
+ "exc_text",
+ ]
+
class TestTaskHandlerHelpers:
def test_safe_attrgetter(self):
@@ -634,6 +652,30 @@ class TestElasticsearchRemoteLogIO:
assert [line["offset"] for line in json_log_lines] == [1, 2, 3]
assert all(line["log_id"] == log_id for line in json_log_lines)
+ def test_get_source_includes(self):
+ assert self.elasticsearch_io._get_source_includes() == [
+ "@timestamp",
+ *TASK_LOG_FIELDS,
+ self.elasticsearch_io.host_field,
+ self.elasticsearch_io.offset_field,
+ ]
+
+ def test_get_source_includes_with_custom_host_and_offset_fields(self):
+ self.elasticsearch_io.host_field = "host.name"
+ self.elasticsearch_io.offset_field = "log.offset"
+
+ assert self.elasticsearch_io._get_source_includes() == [
+ "@timestamp",
+ *TASK_LOG_FIELDS,
+ "host.name",
+ "log.offset",
+ ]
+
+ def
test_get_source_includes_deduplicates_when_offset_overlaps_task_fields(self):
+ self.elasticsearch_io.offset_field = "timestamp" # already in
TASK_LOG_FIELDS
+ fields = self.elasticsearch_io._get_source_includes()
+ assert fields.count("timestamp") == 1
+
def test_es_read_builds_expected_query(self, ti):
self.elasticsearch_io.client = Mock()
self.elasticsearch_io.client.search.return_value =
_build_es_search_response(
@@ -661,6 +703,12 @@ class TestElasticsearchRemoteLogIO:
sort=[self.elasticsearch_io.offset_field],
size=self.elasticsearch_io.MAX_LINE_PER_PAGE,
from_=0,
+ source_includes=[
+ "@timestamp",
+ *TASK_LOG_FIELDS,
+ self.elasticsearch_io.host_field,
+ self.elasticsearch_io.offset_field,
+ ],
)
assert response is not None
assert response.hits[0].event == "hello"