This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 30994d9b258 Optimize `ElasticsearchTaskHandler` by removing redundant
`count()` call before `search()` (#64372)
30994d9b258 is described below
commit 30994d9b25879cb0cfd6340430abf523fdc393ea
Author: Jorge Rocamora <[email protected]>
AuthorDate: Fri Apr 3 12:49:37 2026 +0200
Optimize `ElasticsearchTaskHandler` by removing redundant `count()` call
before `search()` (#64372)
---
.../providers/elasticsearch/log/es_task_handler.py | 34 +++++++++++-----------
.../unit/elasticsearch/log/test_es_task_handler.py | 11 +++----
2 files changed, 21 insertions(+), 24 deletions(-)
diff --git
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
index a6ce2df705a..4df2b0ddd7d 100644
---
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
+++
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -739,25 +739,25 @@ class ElasticsearchRemoteLogIO(LoggingMixin): # noqa:
D101
index_patterns = self._get_index_patterns(ti)
try:
- max_log_line = self.client.count(index=index_patterns,
query=query)["count"]
- except NotFoundError as e:
+ res = self.client.search(
+ index=index_patterns,
+ query=query,
+ sort=[self.offset_field],
+ size=self.MAX_LINE_PER_PAGE,
+ from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+ )
+ except NotFoundError:
self.log.exception("The target index pattern %s does not exist",
index_patterns)
- raise e
-
- if max_log_line != 0:
- try:
- res = self.client.search(
- index=index_patterns,
- query=query,
- sort=[self.offset_field],
- size=self.MAX_LINE_PER_PAGE,
- from_=self.MAX_LINE_PER_PAGE * self.PAGE,
- )
- return ElasticSearchResponse(self, res)
- except Exception as err:
- self.log.exception("Could not read log with log_id: %s.
Exception: %s", log_id, err)
+ raise
+ except Exception:
+ self.log.exception("Could not read log with log_id: %s", log_id)
+ return None
- return None
+ # Short-circuit on empty hits to avoid constructing
ElasticSearchResponse unnecessarily.
+ if not res.get("hits", {}).get("hits"):
+ return None
+
+ return ElasticSearchResponse(self, res)
def _get_index_patterns(self, ti: RuntimeTI | None) -> str:
"""
diff --git
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
index 6b11c6068a5..c69b6f3b36b 100644
---
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
+++
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
@@ -636,7 +636,6 @@ class TestElasticsearchRemoteLogIO:
def test_es_read_builds_expected_query(self, ti):
self.elasticsearch_io.client = Mock()
- self.elasticsearch_io.client.count.return_value = {"count": 1}
self.elasticsearch_io.client.search.return_value =
_build_es_search_response(
{
"event": "hello",
@@ -655,7 +654,7 @@ class TestElasticsearchRemoteLogIO:
response = self.elasticsearch_io._es_read(log_id, 2, ti)
-
self.elasticsearch_io.client.count.assert_called_once_with(index="airflow-logs-*",
query=query)
+ self.elasticsearch_io.client.count.assert_not_called()
self.elasticsearch_io.client.search.assert_called_once_with(
index="airflow-logs-*",
query=query,
@@ -666,19 +665,18 @@ class TestElasticsearchRemoteLogIO:
assert response is not None
assert response.hits[0].event == "hello"
- def test_es_read_returns_none_when_count_is_zero(self, ti):
+ def test_es_read_returns_none_when_search_returns_empty(self, ti):
self.elasticsearch_io.client = Mock()
- self.elasticsearch_io.client.count.return_value = {"count": 0}
+ self.elasticsearch_io.client.search.return_value =
_build_es_search_response()
log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti,
ti.try_number)
response = self.elasticsearch_io._es_read(log_id, 0, ti)
assert response is None
- self.elasticsearch_io.client.search.assert_not_called()
def test_es_read_propagates_missing_index(self, ti):
self.elasticsearch_io.client = Mock()
- self.elasticsearch_io.client.count.side_effect =
elasticsearch.exceptions.NotFoundError(
+ self.elasticsearch_io.client.search.side_effect =
elasticsearch.exceptions.NotFoundError(
404,
"IndexMissingException[[missing] missing]",
{},
@@ -690,7 +688,6 @@ class TestElasticsearchRemoteLogIO:
def test_es_read_logs_and_returns_none_on_search_error(self, ti):
self.elasticsearch_io.client = Mock()
- self.elasticsearch_io.client.count.return_value = {"count": 1}
self.elasticsearch_io.client.search.side_effect = RuntimeError("boom")
log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti,
ti.try_number)