This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0d7fe471d4 Update es read query to not use body (#34792)
0d7fe471d4 is described below
commit 0d7fe471d428cd49b1eacaf84c8067796ca57fa7
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Nov 9 02:49:24 2023 -0800
Update es read query to not use body (#34792)
---
.../providers/elasticsearch/log/es_task_handler.py | 16 +++++++--------
airflow/providers/elasticsearch/provider.yaml | 2 +-
generated/provider_dependencies.json | 2 +-
.../log/elasticmock/fake_elasticsearch.py | 24 +++++++++++-----------
.../log/elasticmock/utilities/__init__.py | 6 +++---
5 files changed, 24 insertions(+), 26 deletions(-)
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py
b/airflow/providers/elasticsearch/log/es_task_handler.py
index 33a323a958..f961f374f2 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -345,26 +345,24 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
:meta private:
"""
query: dict[Any, Any] = {
- "query": {
- "bool": {
- "filter": [{"range": {self.offset_field: {"gt":
int(offset)}}}],
- "must": [{"match_phrase": {"log_id": log_id}}],
- }
+ "bool": {
+ "filter": [{"range": {self.offset_field: {"gt":
int(offset)}}}],
+ "must": [{"match_phrase": {"log_id": log_id}}],
}
}
try:
- max_log_line = self.client.count(index=self.index_patterns,
body=query)["count"] # type: ignore
+ max_log_line = self.client.count(index=self.index_patterns,
query=query)["count"] # type: ignore
except NotFoundError as e:
self.log.exception("The target index pattern %s does not exist",
self.index_patterns)
raise e
if max_log_line != 0:
try:
- query.update({"sort": [self.offset_field]})
- res = self.client.search( # type: ignore
+ res = self.client.search(
index=self.index_patterns,
- body=query,
+ query=query,
+ sort=[self.offset_field],
size=self.MAX_LINE_PER_PAGE,
from_=self.MAX_LINE_PER_PAGE * self.PAGE,
)
diff --git a/airflow/providers/elasticsearch/provider.yaml
b/airflow/providers/elasticsearch/provider.yaml
index 2afcdffbb8..dc7e7829c1 100644
--- a/airflow/providers/elasticsearch/provider.yaml
+++ b/airflow/providers/elasticsearch/provider.yaml
@@ -57,7 +57,7 @@ versions:
dependencies:
- apache-airflow>=2.5.0
- apache-airflow-providers-common-sql>=1.3.1
- - elasticsearch>8,<9
+ - elasticsearch>=8.10,<9
integrations:
- integration-name: Elasticsearch
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index 67eb9f95a2..4d4d6cb90a 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -371,7 +371,7 @@
"deps": [
"apache-airflow-providers-common-sql>=1.3.1",
"apache-airflow>=2.5.0",
- "elasticsearch>8,<9"
+ "elasticsearch>=8.10,<9"
],
"cross-providers-deps": [
"common.sql"
diff --git
a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
index ff228bf77f..79aadae835 100644
--- a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
+++ b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
@@ -325,8 +325,8 @@ class FakeElasticsearch(Elasticsearch):
"track_scores",
"version",
)
- def count(self, index=None, doc_type=None, body=None, params=None,
headers=None):
- searchable_indexes = self._normalize_index_to_list(index, body)
+ def count(self, index=None, doc_type=None, query=None, params=None,
headers=None):
+ searchable_indexes = self._normalize_index_to_list(index, query=query)
searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
i = 0
for searchable_index in searchable_indexes:
@@ -372,10 +372,10 @@ class FakeElasticsearch(Elasticsearch):
"track_scores",
"version",
)
- def search(self, index=None, doc_type=None, body=None, params=None,
headers=None):
- searchable_indexes = self._normalize_index_to_list(index, body)
+ def search(self, index=None, doc_type=None, query=None, params=None,
headers=None):
+ searchable_indexes = self._normalize_index_to_list(index, query=query)
- matches = self._find_match(index, doc_type, body)
+ matches = self._find_match(index, doc_type, query=query)
result = {
"hits": {"total": len(matches), "max_score": 1.0},
@@ -442,11 +442,11 @@ class FakeElasticsearch(Elasticsearch):
]
return result_dict
- def _find_match(self, index, doc_type, body):
- searchable_indexes = self._normalize_index_to_list(index, body)
+ def _find_match(self, index, doc_type, query):
+ searchable_indexes = self._normalize_index_to_list(index, query=query)
searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
- must = body["query"]["bool"]["must"][0] # only support one must
+ must = query["bool"]["must"][0] # only support one must
matches = []
for searchable_index in searchable_indexes:
@@ -472,7 +472,7 @@ class FakeElasticsearch(Elasticsearch):
matches.append(document)
# Check index(es) exists.
- def _validate_search_targets(self, targets, body):
+ def _validate_search_targets(self, targets, query):
# TODO: support allow_no_indices query parameter
matches = set()
for target in targets:
@@ -482,10 +482,10 @@ class FakeElasticsearch(Elasticsearch):
elif "*" in target:
matches.update(fnmatch.filter(self.__documents_dict, target))
elif target not in self.__documents_dict:
- raise
MissingIndexException(msg=f"IndexMissingException[[{target}] missing]",
body=body)
+ raise
MissingIndexException(msg=f"IndexMissingException[[{target}] missing]",
query=query)
return matches
- def _normalize_index_to_list(self, index, body):
+ def _normalize_index_to_list(self, index, query):
# Ensure to have a list of index
if index is None:
searchable_indexes = self.__documents_dict.keys()
@@ -498,7 +498,7 @@ class FakeElasticsearch(Elasticsearch):
raise ValueError("Invalid param 'index'")
generator = (target for index in searchable_indexes for target in
index.split(","))
- return list(self._validate_search_targets(generator, body))
+ return list(self._validate_search_targets(generator, query=query))
@staticmethod
def _normalize_doc_type_to_list(doc_type):
diff --git
a/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
b/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
index e145b81818..f5a6c14dba 100644
--- a/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
+++ b/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
@@ -213,12 +213,12 @@ def _escape(value):
class MissingIndexException(NotFoundError):
"""Exception representing a missing index."""
- def __init__(self, msg, body):
+ def __init__(self, msg, query):
self.msg = msg
- self.body = body
+ self.query = query
def __str__(self):
- return f"IndexMissingException[[{self.msg}] missing] with body
{self.body}"
+ return f"IndexMissingException[[{self.msg}] missing] with query
{self.query}"
class SearchFailedException(NotFoundError):