This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d7fe471d4 Update es read query to not use body (#34792)
0d7fe471d4 is described below

commit 0d7fe471d428cd49b1eacaf84c8067796ca57fa7
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Nov 9 02:49:24 2023 -0800

    Update es read query to not use body (#34792)
---
 .../providers/elasticsearch/log/es_task_handler.py | 16 +++++++--------
 airflow/providers/elasticsearch/provider.yaml      |  2 +-
 generated/provider_dependencies.json               |  2 +-
 .../log/elasticmock/fake_elasticsearch.py          | 24 +++++++++++-----------
 .../log/elasticmock/utilities/__init__.py          |  6 +++---
 5 files changed, 24 insertions(+), 26 deletions(-)

diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py 
b/airflow/providers/elasticsearch/log/es_task_handler.py
index 33a323a958..f961f374f2 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -345,26 +345,24 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
         :meta private:
         """
         query: dict[Any, Any] = {
-            "query": {
-                "bool": {
-                    "filter": [{"range": {self.offset_field: {"gt": 
int(offset)}}}],
-                    "must": [{"match_phrase": {"log_id": log_id}}],
-                }
+            "bool": {
+                "filter": [{"range": {self.offset_field: {"gt": 
int(offset)}}}],
+                "must": [{"match_phrase": {"log_id": log_id}}],
             }
         }
 
         try:
-            max_log_line = self.client.count(index=self.index_patterns, 
body=query)["count"]  # type: ignore
+            max_log_line = self.client.count(index=self.index_patterns, 
query=query)["count"]  # type: ignore
         except NotFoundError as e:
             self.log.exception("The target index pattern %s does not exist", 
self.index_patterns)
             raise e
 
         if max_log_line != 0:
             try:
-                query.update({"sort": [self.offset_field]})
-                res = self.client.search(  # type: ignore
+                res = self.client.search(
                     index=self.index_patterns,
-                    body=query,
+                    query=query,
+                    sort=[self.offset_field],
                     size=self.MAX_LINE_PER_PAGE,
                     from_=self.MAX_LINE_PER_PAGE * self.PAGE,
                 )
diff --git a/airflow/providers/elasticsearch/provider.yaml 
b/airflow/providers/elasticsearch/provider.yaml
index 2afcdffbb8..dc7e7829c1 100644
--- a/airflow/providers/elasticsearch/provider.yaml
+++ b/airflow/providers/elasticsearch/provider.yaml
@@ -57,7 +57,7 @@ versions:
 dependencies:
   - apache-airflow>=2.5.0
   - apache-airflow-providers-common-sql>=1.3.1
-  - elasticsearch>8,<9
+  - elasticsearch>=8.10,<9
 
 integrations:
   - integration-name: Elasticsearch
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index 67eb9f95a2..4d4d6cb90a 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -371,7 +371,7 @@
     "deps": [
       "apache-airflow-providers-common-sql>=1.3.1",
       "apache-airflow>=2.5.0",
-      "elasticsearch>8,<9"
+      "elasticsearch>=8.10,<9"
     ],
     "cross-providers-deps": [
       "common.sql"
diff --git 
a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py 
b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
index ff228bf77f..79aadae835 100644
--- a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
+++ b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
@@ -325,8 +325,8 @@ class FakeElasticsearch(Elasticsearch):
         "track_scores",
         "version",
     )
-    def count(self, index=None, doc_type=None, body=None, params=None, 
headers=None):
-        searchable_indexes = self._normalize_index_to_list(index, body)
+    def count(self, index=None, doc_type=None, query=None, params=None, 
headers=None):
+        searchable_indexes = self._normalize_index_to_list(index, query=query)
         searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
         i = 0
         for searchable_index in searchable_indexes:
@@ -372,10 +372,10 @@ class FakeElasticsearch(Elasticsearch):
         "track_scores",
         "version",
     )
-    def search(self, index=None, doc_type=None, body=None, params=None, 
headers=None):
-        searchable_indexes = self._normalize_index_to_list(index, body)
+    def search(self, index=None, doc_type=None, query=None, params=None, 
headers=None):
+        searchable_indexes = self._normalize_index_to_list(index, query=query)
 
-        matches = self._find_match(index, doc_type, body)
+        matches = self._find_match(index, doc_type, query=query)
 
         result = {
             "hits": {"total": len(matches), "max_score": 1.0},
@@ -442,11 +442,11 @@ class FakeElasticsearch(Elasticsearch):
             ]
         return result_dict
 
-    def _find_match(self, index, doc_type, body):
-        searchable_indexes = self._normalize_index_to_list(index, body)
+    def _find_match(self, index, doc_type, query):
+        searchable_indexes = self._normalize_index_to_list(index, query=query)
         searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
 
-        must = body["query"]["bool"]["must"][0]  # only support one must
+        must = query["bool"]["must"][0]  # only support one must
 
         matches = []
         for searchable_index in searchable_indexes:
@@ -472,7 +472,7 @@ class FakeElasticsearch(Elasticsearch):
                     matches.append(document)
 
     # Check index(es) exists.
-    def _validate_search_targets(self, targets, body):
+    def _validate_search_targets(self, targets, query):
         # TODO: support allow_no_indices query parameter
         matches = set()
         for target in targets:
@@ -482,10 +482,10 @@ class FakeElasticsearch(Elasticsearch):
             elif "*" in target:
                 matches.update(fnmatch.filter(self.__documents_dict, target))
             elif target not in self.__documents_dict:
-                raise 
MissingIndexException(msg=f"IndexMissingException[[{target}] missing]", 
body=body)
+                raise 
MissingIndexException(msg=f"IndexMissingException[[{target}] missing]", 
query=query)
         return matches
 
-    def _normalize_index_to_list(self, index, body):
+    def _normalize_index_to_list(self, index, query):
         # Ensure to have a list of index
         if index is None:
             searchable_indexes = self.__documents_dict.keys()
@@ -498,7 +498,7 @@ class FakeElasticsearch(Elasticsearch):
             raise ValueError("Invalid param 'index'")
 
         generator = (target for index in searchable_indexes for target in 
index.split(","))
-        return list(self._validate_search_targets(generator, body))
+        return list(self._validate_search_targets(generator, query=query))
 
     @staticmethod
     def _normalize_doc_type_to_list(doc_type):
diff --git 
a/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py 
b/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
index e145b81818..f5a6c14dba 100644
--- a/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
+++ b/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
@@ -213,12 +213,12 @@ def _escape(value):
 class MissingIndexException(NotFoundError):
     """Exception representing a missing index."""
 
-    def __init__(self, msg, body):
+    def __init__(self, msg, query):
         self.msg = msg
-        self.body = body
+        self.query = query
 
     def __str__(self):
-        return f"IndexMissingException[[{self.msg}] missing] with body 
{self.body}"
+        return f"IndexMissingException[[{self.msg}] missing] with query 
{self.query}"
 
 
 class SearchFailedException(NotFoundError):

Reply via email to