This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 30994d9b258 Optimize `ElasticsearchTaskHandler` by removing redundant 
`count()` call before `search()` (#64372)
30994d9b258 is described below

commit 30994d9b25879cb0cfd6340430abf523fdc393ea
Author: Jorge Rocamora <[email protected]>
AuthorDate: Fri Apr 3 12:49:37 2026 +0200

    Optimize `ElasticsearchTaskHandler` by removing redundant `count()` call 
before `search()` (#64372)
---
 .../providers/elasticsearch/log/es_task_handler.py | 34 +++++++++++-----------
 .../unit/elasticsearch/log/test_es_task_handler.py | 11 +++----
 2 files changed, 21 insertions(+), 24 deletions(-)

diff --git 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
index a6ce2df705a..4df2b0ddd7d 100644
--- 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
+++ 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -739,25 +739,25 @@ class ElasticsearchRemoteLogIO(LoggingMixin):  # noqa: 
D101
 
         index_patterns = self._get_index_patterns(ti)
         try:
-            max_log_line = self.client.count(index=index_patterns, 
query=query)["count"]
-        except NotFoundError as e:
+            res = self.client.search(
+                index=index_patterns,
+                query=query,
+                sort=[self.offset_field],
+                size=self.MAX_LINE_PER_PAGE,
+                from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+            )
+        except NotFoundError:
             self.log.exception("The target index pattern %s does not exist", 
index_patterns)
-            raise e
-
-        if max_log_line != 0:
-            try:
-                res = self.client.search(
-                    index=index_patterns,
-                    query=query,
-                    sort=[self.offset_field],
-                    size=self.MAX_LINE_PER_PAGE,
-                    from_=self.MAX_LINE_PER_PAGE * self.PAGE,
-                )
-                return ElasticSearchResponse(self, res)
-            except Exception as err:
-                self.log.exception("Could not read log with log_id: %s. 
Exception: %s", log_id, err)
+            raise
+        except Exception:
+            self.log.exception("Could not read log with log_id: %s", log_id)
+            return None
 
-        return None
+        # Short-circuit on empty hits to avoid constructing 
ElasticSearchResponse unnecessarily.
+        if not res.get("hits", {}).get("hits"):
+            return None
+
+        return ElasticSearchResponse(self, res)
 
     def _get_index_patterns(self, ti: RuntimeTI | None) -> str:
         """
diff --git 
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py 
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
index 6b11c6068a5..c69b6f3b36b 100644
--- 
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
+++ 
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
@@ -636,7 +636,6 @@ class TestElasticsearchRemoteLogIO:
 
     def test_es_read_builds_expected_query(self, ti):
         self.elasticsearch_io.client = Mock()
-        self.elasticsearch_io.client.count.return_value = {"count": 1}
         self.elasticsearch_io.client.search.return_value = 
_build_es_search_response(
             {
                 "event": "hello",
@@ -655,7 +654,7 @@ class TestElasticsearchRemoteLogIO:
 
         response = self.elasticsearch_io._es_read(log_id, 2, ti)
 
-        
self.elasticsearch_io.client.count.assert_called_once_with(index="airflow-logs-*",
 query=query)
+        self.elasticsearch_io.client.count.assert_not_called()
         self.elasticsearch_io.client.search.assert_called_once_with(
             index="airflow-logs-*",
             query=query,
@@ -666,19 +665,18 @@ class TestElasticsearchRemoteLogIO:
         assert response is not None
         assert response.hits[0].event == "hello"
 
-    def test_es_read_returns_none_when_count_is_zero(self, ti):
+    def test_es_read_returns_none_when_search_returns_empty(self, ti):
         self.elasticsearch_io.client = Mock()
-        self.elasticsearch_io.client.count.return_value = {"count": 0}
+        self.elasticsearch_io.client.search.return_value = 
_build_es_search_response()
 
         log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti, 
ti.try_number)
         response = self.elasticsearch_io._es_read(log_id, 0, ti)
 
         assert response is None
-        self.elasticsearch_io.client.search.assert_not_called()
 
     def test_es_read_propagates_missing_index(self, ti):
         self.elasticsearch_io.client = Mock()
-        self.elasticsearch_io.client.count.side_effect = 
elasticsearch.exceptions.NotFoundError(
+        self.elasticsearch_io.client.search.side_effect = 
elasticsearch.exceptions.NotFoundError(
             404,
             "IndexMissingException[[missing] missing]",
             {},
@@ -690,7 +688,6 @@ class TestElasticsearchRemoteLogIO:
 
     def test_es_read_logs_and_returns_none_on_search_error(self, ti):
         self.elasticsearch_io.client = Mock()
-        self.elasticsearch_io.client.count.return_value = {"count": 1}
         self.elasticsearch_io.client.search.side_effect = RuntimeError("boom")
 
         log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti, 
ti.try_number)

Reply via email to