This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ad9d8d46b6 Upgrade Elasticsearch to 8 (#33135)
ad9d8d46b6 is described below
commit ad9d8d46b6ee3a7d8e6665d2a6f5c6660063f281
Author: Owen Leung <[email protected]>
AuthorDate: Wed Aug 9 00:05:45 2023 +0800
Upgrade Elasticsearch to 8 (#33135)
---
airflow/config_templates/config.yml | 92 -----------
airflow/providers/elasticsearch/CHANGELOG.rst | 6 +
.../providers/elasticsearch/log/es_task_handler.py | 19 +--
airflow/providers/elasticsearch/provider.yaml | 96 ++++++++++-
.../configurations-ref.rst | 18 +++
.../index.rst | 3 +-
docs/apache-airflow/configurations-ref.rst | 1 +
generated/provider_dependencies.json | 2 +-
.../elasticsearch/log/elasticmock/__init__.py | 44 ++++-
.../log/elasticmock/fake_elasticsearch.py | 26 ++-
.../log/elasticmock/utilities/__init__.py | 180 +++++++++++++++++++++
.../elasticsearch/log/test_es_task_handler.py | 10 +-
12 files changed, 368 insertions(+), 129 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index f61f6baca0..082d4d4d51 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2317,98 +2317,6 @@ kerberos:
type: boolean
example: ~
default: "True"
-elasticsearch:
- description: ~
- options:
- host:
- description: |
- Elasticsearch host
- version_added: 1.10.4
- type: string
- example: ~
- default: ""
- log_id_template:
- description: |
- Format of the log_id, which is used to query for a given tasks logs
- version_added: 1.10.4
- type: string
- example: ~
- is_template: true
- default: "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
- end_of_log_mark:
- description: |
- Used to mark the end of a log stream for a task
- version_added: 1.10.4
- type: string
- example: ~
- default: "end_of_log"
- frontend:
- description: |
- Qualified URL for an elasticsearch frontend (like Kibana) with a
template argument for log_id
- Code will construct log_id using the log_id template from the argument
above.
- NOTE: scheme will default to https if one is not provided
- version_added: 1.10.4
- type: string
- example: "http://localhost:5601/app/kibana#/discover\
- ?_a=(columns:!(message),query:(language:kuery,query:'log_id:
\"{log_id}\"'),sort:!(log.offset,asc))"
- default: ""
- write_stdout:
- description: |
- Write the task logs to the stdout of the worker, rather than the
default files
- version_added: 1.10.4
- type: string
- example: ~
- default: "False"
- json_format:
- description: |
- Instead of the default log formatter, write the log lines as JSON
- version_added: 1.10.4
- type: string
- example: ~
- default: "False"
- json_fields:
- description: |
- Log fields to also attach to the json output, if enabled
- version_added: 1.10.4
- type: string
- example: ~
- default: "asctime, filename, lineno, levelname, message"
- host_field:
- description: |
- The field where host name is stored (normally either `host` or
`host.name`)
- version_added: 2.1.1
- type: string
- example: ~
- default: "host"
- offset_field:
- description: |
- The field where offset is stored (normally either `offset` or
`log.offset`)
- version_added: 2.1.1
- type: string
- example: ~
- default: "offset"
- index_patterns:
- description: |
- Comma separated list of index patterns to use when searching for logs
(default: `_all`).
- version_added: 2.6.0
- type: string
- example: something-*
- default: "_all"
-elasticsearch_configs:
- description: ~
- options:
- use_ssl:
- description: ~
- version_added: 1.10.5
- type: string
- example: ~
- default: "False"
- verify_certs:
- description: ~
- version_added: 1.10.5
- type: string
- example: ~
- default: "True"
sensors:
description: ~
options:
diff --git a/airflow/providers/elasticsearch/CHANGELOG.rst
b/airflow/providers/elasticsearch/CHANGELOG.rst
index a7daf14753..b7882d0656 100644
--- a/airflow/providers/elasticsearch/CHANGELOG.rst
+++ b/airflow/providers/elasticsearch/CHANGELOG.rst
@@ -27,6 +27,12 @@
Changelog
---------
+.. note::
+ Upgrade to Elasticsearch 8. The ElasticsearchTaskHandler &
ElasticsearchSQLHook will now use Elasticsearch 8 package.
+ As explained https://elasticsearch-py.readthedocs.io/en/stable ,
Elasticsearch language clients are only backwards
+ compatible with default distributions and without guarantees made, we
recommend upgrading the version of
+ Elasticsearch database to 8 to ensure compatibility with the language client.
+
5.0.0
.....
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py
b/airflow/providers/elasticsearch/log/es_task_handler.py
index ccac90480a..03bfe247c5 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -30,7 +30,7 @@ from urllib.parse import quote
# Using `from elasticsearch import *` would break elasticsearch mocking used
in unit test.
import elasticsearch
import pendulum
-from elasticsearch.exceptions import ElasticsearchException, NotFoundError
+from elasticsearch.exceptions import NotFoundError
from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
@@ -89,7 +89,7 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
json_fields: str,
host_field: str = "host",
offset_field: str = "offset",
- host: str = "localhost:9200",
+ host: str = "http://localhost:9200",
frontend: str = "localhost:5601",
index_patterns: str | None = conf.get("elasticsearch",
"index_patterns", fallback="_all"),
es_kwargs: dict | None = conf.getsection("elasticsearch_configs"),
@@ -101,8 +101,8 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
super().__init__(base_log_folder, filename_template)
self.closed = False
- self.client = elasticsearch.Elasticsearch(host.split(";"),
**es_kwargs) # type: ignore[attr-defined]
-
+ self.client = elasticsearch.Elasticsearch(host, **es_kwargs) # type:
ignore[attr-defined]
+ # in airflow.cfg, host of elasticsearch has to be
http://dockerhostXxxx:9200
if USE_PER_RUN_LOG_ID and log_id_template is not None:
warnings.warn(
"Passing log_id_template to ElasticsearchTaskHandler is
deprecated and has no effect",
@@ -292,27 +292,24 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
}
try:
- max_log_line = self.client.count(index=self.index_patterns,
body=query)["count"]
+ max_log_line = self.client.count(index=self.index_patterns,
body=query)["count"] # type: ignore
except NotFoundError as e:
self.log.exception("The target index pattern %s does not exist",
self.index_patterns)
raise e
- except ElasticsearchException as e:
- self.log.exception("Could not get current log size with log_id:
%s", log_id)
- raise e
logs: list[Any] | ElasticSearchResponse = []
if max_log_line != 0:
try:
query.update({"sort": [self.offset_field]})
- res = self.client.search(
+ res = self.client.search( # type: ignore
index=self.index_patterns,
body=query,
size=self.MAX_LINE_PER_PAGE,
from_=self.MAX_LINE_PER_PAGE * self.PAGE,
)
logs = ElasticSearchResponse(self, res)
- except elasticsearch.exceptions.ElasticsearchException:
- self.log.exception("Could not read log with log_id: %s",
log_id)
+ except Exception as err:
+ self.log.exception("Could not read log with log_id: %s.
Exception: %s", log_id, err)
return logs
diff --git a/airflow/providers/elasticsearch/provider.yaml
b/airflow/providers/elasticsearch/provider.yaml
index 2b6ffaabf2..b848c1647c 100644
--- a/airflow/providers/elasticsearch/provider.yaml
+++ b/airflow/providers/elasticsearch/provider.yaml
@@ -53,7 +53,7 @@ versions:
dependencies:
- apache-airflow>=2.4.0
- apache-airflow-providers-common-sql>=1.3.1
- - elasticsearch>7,<7.15.0
+ - elasticsearch>8,<9
integrations:
- integration-name: Elasticsearch
@@ -72,3 +72,97 @@ connection-types:
logging:
-
airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler
+
+config:
+ elasticsearch:
+ description: ~
+ options:
+ host:
+ description: |
+ Elasticsearch host
+ version_added: 1.10.4
+ type: string
+ example: ~
+ default: ""
+ log_id_template:
+ description: |
+ Format of the log_id, which is used to query for a given tasks logs
+ version_added: 1.10.4
+ type: string
+ example: ~
+ is_template: true
+ default: "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
+ end_of_log_mark:
+ description: |
+ Used to mark the end of a log stream for a task
+ version_added: 1.10.4
+ type: string
+ example: ~
+ default: "end_of_log"
+ frontend:
+ description: |
+ Qualified URL for an elasticsearch frontend (like Kibana) with a
template argument for log_id
+ Code will construct log_id using the log_id template from the
argument above.
+ NOTE: scheme will default to https if one is not provided
+ version_added: 1.10.4
+ type: string
+ example: "http://localhost:5601/app/kibana#/discover\
+ ?_a=(columns:!(message),query:(language:kuery,query:'log_id:
\"{log_id}\"'),sort:!(log.offset,asc))"
+ default: ""
+ write_stdout:
+ description: |
+ Write the task logs to the stdout of the worker, rather than the
default files
+ version_added: 1.10.4
+ type: string
+ example: ~
+ default: "False"
+ json_format:
+ description: |
+ Instead of the default log formatter, write the log lines as JSON
+ version_added: 1.10.4
+ type: string
+ example: ~
+ default: "False"
+ json_fields:
+ description: |
+ Log fields to also attach to the json output, if enabled
+ version_added: 1.10.4
+ type: string
+ example: ~
+ default: "asctime, filename, lineno, levelname, message"
+ host_field:
+ description: |
+ The field where host name is stored (normally either `host` or
`host.name`)
+ version_added: 2.1.1
+ type: string
+ example: ~
+ default: "host"
+ offset_field:
+ description: |
+ The field where offset is stored (normally either `offset` or
`log.offset`)
+ version_added: 2.1.1
+ type: string
+ example: ~
+ default: "offset"
+ index_patterns:
+ description: |
+ Comma separated list of index patterns to use when searching for
logs (default: `_all`).
+ version_added: 2.6.0
+ type: string
+ example: something-*
+ default: "_all"
+ elasticsearch_configs:
+ description: ~
+ options:
+ http_compress:
+ description: ~
+ version_added: 1.10.5
+ type: string
+ example: ~
+ default: "False"
+ verify_certs:
+ description: ~
+ version_added: 1.10.5
+ type: string
+ example: ~
+ default: "True"
diff --git a/docs/apache-airflow-providers-elasticsearch/configurations-ref.rst
b/docs/apache-airflow-providers-elasticsearch/configurations-ref.rst
new file mode 100644
index 0000000000..5885c9d91b
--- /dev/null
+++ b/docs/apache-airflow-providers-elasticsearch/configurations-ref.rst
@@ -0,0 +1,18 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. include:: ../exts/includes/providers-configurations-ref.rst
diff --git a/docs/apache-airflow-providers-elasticsearch/index.rst
b/docs/apache-airflow-providers-elasticsearch/index.rst
index 5a4582412a..f2b38434d2 100644
--- a/docs/apache-airflow-providers-elasticsearch/index.rst
+++ b/docs/apache-airflow-providers-elasticsearch/index.rst
@@ -43,6 +43,7 @@
:maxdepth: 1
:caption: References
+ Configuration <configurations-ref>
Python API <_api/airflow/providers/elasticsearch/index>
.. toctree::
@@ -103,7 +104,7 @@ PIP package Version required
======================================= ==================
``apache-airflow`` ``>=2.4.0``
``apache-airflow-providers-common-sql`` ``>=1.3.1``
-``elasticsearch`` ``>7,<7.15.0``
+``elasticsearch`` ``>8,<9``
======================================= ==================
Cross provider package dependencies
diff --git a/docs/apache-airflow/configurations-ref.rst
b/docs/apache-airflow/configurations-ref.rst
index f323ea31f0..c92a9975e3 100644
--- a/docs/apache-airflow/configurations-ref.rst
+++ b/docs/apache-airflow/configurations-ref.rst
@@ -41,6 +41,7 @@ in the provider's documentation. The pre-installed providers
that you may want t
* :doc:`Configuration Reference for SMTP Provider
<apache-airflow-providers-smtp:configurations-ref>`
* :doc:`Configuration Reference for IMAP Provider
<apache-airflow-providers-imap:configurations-ref>`
* :doc:`Configuration Reference for OpenLineage Provider
<apache-airflow-providers-openlineage:configurations-ref>`
+* :doc:`Configuration Reference for Elasticsearch Provider
<apache-airflow-providers-elasticsearch:configurations-ref>`
.. note::
For more information see :doc:`/howto/set-config`.
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index 7300c0fc39..0bd446853e 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -358,7 +358,7 @@
"deps": [
"apache-airflow-providers-common-sql>=1.3.1",
"apache-airflow>=2.4.0",
- "elasticsearch>7,<7.15.0"
+ "elasticsearch>8,<9"
],
"cross-providers-deps": [
"common.sql"
diff --git a/tests/providers/elasticsearch/log/elasticmock/__init__.py
b/tests/providers/elasticsearch/log/elasticmock/__init__.py
index 4f38d3d932..0884cff9ef 100644
--- a/tests/providers/elasticsearch/log/elasticmock/__init__.py
+++ b/tests/providers/elasticsearch/log/elasticmock/__init__.py
@@ -41,17 +41,55 @@ from __future__ import annotations
"""Elastic mock module used for testing"""
from functools import wraps
from unittest.mock import patch
-
-from elasticsearch.client.utils import _normalize_hosts
+from urllib.parse import unquote, urlparse
from .fake_elasticsearch import FakeElasticsearch
ELASTIC_INSTANCES: dict[str, FakeElasticsearch] = {}
+def _normalize_hosts(hosts):
+ """
+ Helper function to transform hosts argument to
+ :class:`~elasticsearch.Elasticsearch` to a list of dicts.
+ """
+ # if hosts are empty, just defer to defaults down the line
+ if hosts is None:
+ return [{}]
+
+ hosts = [hosts]
+
+ out = []
+
+ for host in hosts:
+ if "://" not in host:
+ host = f"//{host}"
+
+ parsed_url = urlparse(host)
+ h = {"host": parsed_url.hostname}
+
+ if parsed_url.port:
+ h["port"] = parsed_url.port
+
+ if parsed_url.scheme == "https":
+ h["port"] = parsed_url.port or 443
+ h["use_ssl"] = True
+
+ if parsed_url.username or parsed_url.password:
+ h["http_auth"] =
f"{unquote(parsed_url.username)}:{unquote(parsed_url.password)}"
+
+ if parsed_url.path and parsed_url.path != "/":
+ h["url_prefix"] = parsed_url.path
+
+ out.append(h)
+ else:
+ out.append(host)
+ return out
+
+
def _get_elasticmock(hosts=None, *args, **kwargs):
host = _normalize_hosts(hosts)[0]
- elastic_key = f"{host.get('host', 'localhost')}:{host.get('port', 9200)}"
+ elastic_key = f"http://{host.get('host', 'localhost')}:{host.get('port',
9200)}"
if elastic_key in ELASTIC_INSTANCES:
connection = ELASTIC_INSTANCES.get(elastic_key)
diff --git
a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
index c4e25d290d..b37608232d 100644
--- a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
+++ b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
@@ -20,10 +20,9 @@ import fnmatch
import json
from elasticsearch import Elasticsearch
-from elasticsearch.client.utils import query_params
from elasticsearch.exceptions import NotFoundError
-from .utilities import get_random_id
+from .utilities import MissingIndexException, get_random_id, query_params
#
# The MIT License (MIT)
@@ -53,7 +52,7 @@ class FakeElasticsearch(Elasticsearch):
__documents_dict = None
def __init__(self):
- super().__init__()
+ super().__init__("http://localhost:9200")
self.__documents_dict = {}
@query_params()
@@ -327,9 +326,8 @@ class FakeElasticsearch(Elasticsearch):
"version",
)
def count(self, index=None, doc_type=None, body=None, params=None,
headers=None):
- searchable_indexes = self._normalize_index_to_list(index)
+ searchable_indexes = self._normalize_index_to_list(index, body)
searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
-
i = 0
for searchable_index in searchable_indexes:
for document in self.__documents_dict[searchable_index]:
@@ -376,7 +374,7 @@ class FakeElasticsearch(Elasticsearch):
"version",
)
def search(self, index=None, doc_type=None, body=None, params=None,
headers=None):
- searchable_indexes = self._normalize_index_to_list(index)
+ searchable_indexes = self._normalize_index_to_list(index, body)
matches = self._find_match(index, doc_type, body)
@@ -446,7 +444,7 @@ class FakeElasticsearch(Elasticsearch):
return result_dict
def _find_match(self, index, doc_type, body):
- searchable_indexes = self._normalize_index_to_list(index)
+ searchable_indexes = self._normalize_index_to_list(index, body)
searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
must = body["query"]["bool"]["must"][0] # only support one must
@@ -477,19 +475,20 @@ class FakeElasticsearch(Elasticsearch):
matches.append(document)
# Check index(es) exists.
- def _validate_search_targets(self, targets):
+ def _validate_search_targets(self, targets, body):
# TODO: support allow_no_indices query parameter
matches = set()
for target in targets:
+ print(f"Loop over:::target = {target}")
if target == "_all" or target == "":
matches.update(self.__documents_dict)
elif "*" in target:
matches.update(fnmatch.filter(self.__documents_dict, target))
elif target not in self.__documents_dict:
- raise NotFoundError(404, f"IndexMissingException[[{target}]
missing]")
+ raise
MissingIndexException(msg=f"IndexMissingException[[{target}] missing]",
body=body)
return matches
- def _normalize_index_to_list(self, index):
+ def _normalize_index_to_list(self, index, body):
# Ensure to have a list of index
if index is None:
searchable_indexes = self.__documents_dict.keys()
@@ -501,11 +500,8 @@ class FakeElasticsearch(Elasticsearch):
# Is it the correct exception to use ?
raise ValueError("Invalid param 'index'")
- return list(
- self._validate_search_targets(
- target for index in searchable_indexes for target in
index.split(",")
- )
- )
+ generator = (target for index in searchable_indexes for target in
index.split(","))
+ return list(self._validate_search_targets(generator, body))
@staticmethod
def _normalize_doc_type_to_list(doc_type):
diff --git
a/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
b/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
index ef142d6d98..cb2d91f4ce 100644
--- a/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
+++ b/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
@@ -39,13 +39,193 @@ from __future__ import annotations
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Utilities for Elastic mock"""
+import base64
import random
import string
+from datetime import date, datetime
+from functools import wraps
+
+from elasticsearch.exceptions import NotFoundError
DEFAULT_ELASTICSEARCH_ID_SIZE = 20
CHARSET_FOR_ELASTICSEARCH_ID = string.ascii_letters + string.digits
+GLOBAL_PARAMS = ("pretty", "human", "error_trace", "format", "filter_path")
def get_random_id(size=DEFAULT_ELASTICSEARCH_ID_SIZE):
"""Returns random if for elasticsearch"""
return "".join(random.choice(CHARSET_FOR_ELASTICSEARCH_ID) for _ in
range(size))
+
+
+def query_params(*es_query_params, **kwargs):
+ """
+ Decorator that pops all accepted parameters from method's kwargs and puts
+ them in the params argument.
+ """
+ body_params = kwargs.pop("body_params", None)
+ body_only_params = set(body_params or ()) - set(es_query_params)
+ body_name = kwargs.pop("body_name", None)
+ body_required = kwargs.pop("body_required", False)
+ type_possible_in_params = "type" in es_query_params
+
+ assert not (body_name and body_params)
+
+ assert not (body_name and body_required)
+ assert not body_required or body_params
+
+ def _wrapper(func):
+ @wraps(func)
+ def _wrapped(*args, **kwargs):
+ params = (kwargs.pop("params", None) or {}).copy()
+ headers = {k.lower(): v for k, v in (kwargs.pop("headers", None)
or {}).copy().items()}
+
+ if "opaque_id" in kwargs:
+ headers["x-opaque-id"] = kwargs.pop("opaque_id")
+
+ http_auth = kwargs.pop("http_auth", None)
+ api_key = kwargs.pop("api_key", None)
+
+ using_body_kwarg = kwargs.get("body", None) is not None
+ using_positional_args = args and len(args) > 1
+
+ if type_possible_in_params:
+ doc_type_in_params = params and "doc_type" in params
+ doc_type_in_kwargs = "doc_type" in kwargs
+
+ if doc_type_in_params:
+ params["type"] = params.pop("doc_type")
+ if doc_type_in_kwargs:
+ kwargs["type"] = kwargs.pop("doc_type")
+
+ if using_body_kwarg or using_positional_args:
+ body_only_params_in_use = body_only_params.intersection(kwargs)
+ if body_only_params_in_use:
+ params_prose = "', '".join(sorted(body_only_params_in_use))
+ plural_params = len(body_only_params_in_use) > 1
+
+ raise TypeError(
+ f"The '{params_prose}' parameter{'s' if plural_params
else ''} "
+ f"{'are' if plural_params else 'is'} only serialized
in the "
+ f"request body and can't be combined with the 'body'
parameter. "
+ f"Either stop using the 'body' parameter and use
keyword-arguments "
+ f"only or move the specified parameters into the
'body'. "
+ f"See
https://github.com/elastic/elasticsearch-py/issues/1698 "
+ f"for more information"
+ )
+
+ elif set(body_params or ()).intersection(kwargs):
+ body = {}
+ for param in body_params:
+ value = kwargs.pop(param, None)
+ if value is not None:
+ body[param.rstrip("_")] = value
+ kwargs["body"] = body
+
+ elif body_required:
+ kwargs["body"] = {}
+
+ if body_name:
+ if body_name in kwargs:
+ if using_body_kwarg:
+ raise TypeError(
+ f"Can't use '{body_name}' and 'body' parameters
together"
+ f" because '{body_name}' is an alias for 'body'. "
+ f"Instead you should only use the '{body_name}' "
+ f"parameter. See
https://github.com/elastic/elasticsearch-py/issues/1698 "
+ f"for more information"
+ )
+ kwargs["body"] = kwargs.pop(body_name)
+
+ if http_auth is not None and api_key is not None:
+ raise ValueError("Only one of 'http_auth' and 'api_key' may be
passed at a time")
+ elif http_auth is not None:
+ headers["authorization"] = f"Basic
{_base64_auth_header(http_auth)}"
+ elif api_key is not None:
+ headers["authorization"] = f"ApiKey
{_base64_auth_header(api_key)}"
+
+ for p in es_query_params + GLOBAL_PARAMS:
+ if p in kwargs:
+ v = kwargs.pop(p)
+ if v is not None:
+ params[p] = _escape(v)
+
+ for p in ("ignore", "request_timeout"):
+ if p in kwargs:
+ params[p] = kwargs.pop(p)
+ return func(*args, params=params, headers=headers, **kwargs)
+
+ return _wrapped
+
+ return _wrapper
+
+
+def to_str(x, encoding="ascii"):
+ if not isinstance(x, str):
+ return x.decode(encoding)
+ return x
+
+
+def to_bytes(x, encoding="ascii"):
+ if not isinstance(x, bytes):
+ return x.encode(encoding)
+ return x
+
+
+def _base64_auth_header(auth_value):
+ """Takes either a 2-tuple or a base64-encoded string
+ and returns a base64-encoded string to be used
+ as an HTTP authorization header.
+ """
+ if isinstance(auth_value, (list, tuple)):
+ auth_value = base64.b64encode(to_bytes(":".join(auth_value)))
+ return to_str(auth_value)
+
+
+def _escape(value):
+ """
+ Escape a single value of a URL string or a query parameter. If it is a list
+ or tuple, turn it into a comma-separated string first.
+ """
+
+ # make sequences into comma-separated strings
+ if isinstance(value, (list, tuple)):
+ value = ",".join(value)
+
+ # dates and datetimes into isoformat
+ elif isinstance(value, (date, datetime)):
+ value = value.isoformat()
+
+ # make bools into true/false strings
+ elif isinstance(value, bool):
+ value = str(value).lower()
+
+ # don't decode bytestrings
+ elif isinstance(value, bytes):
+ return value
+
+ # encode strings to utf-8
+ if isinstance(value, str):
+ return value.encode("utf-8")
+
+ return str(value)
+
+
+class MissingIndexException(NotFoundError):
+ """Exception representing a missing index."""
+
+ def __init__(self, msg, body):
+ self.msg = msg
+ self.body = body
+
+ def __str__(self):
+ return f"IndexMissingException[[{self.msg}] missing] with body
{self.body}"
+
+
+class SearchFailedException(NotFoundError):
+ """Exception representing a search failure."""
+
+ def __init__(self, msg):
+ self.msg = msg
+
+ def __str__(self):
+ return f"SearchFailedException: {self.msg}"
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py
b/tests/providers/elasticsearch/log/test_es_task_handler.py
index 93137ff2b2..7ae894f22a 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -29,7 +29,6 @@ from urllib.parse import quote
import elasticsearch
import pendulum
import pytest
-from elasticsearch.exceptions import ElasticsearchException
from airflow.configuration import conf
from airflow.providers.elasticsearch.log.es_response import
ElasticSearchResponse
@@ -40,6 +39,7 @@ from airflow.utils.timezone import datetime
from tests.test_utils.db import clear_db_dags, clear_db_runs
from .elasticmock import elasticmock
+from .elasticmock.utilities import SearchFailedException
def get_ti(dag_id, task_id, execution_date, create_task_instance):
@@ -94,7 +94,7 @@ class TestElasticsearchTaskHandler:
offset_field=self.offset_field,
)
- self.es = elasticsearch.Elasticsearch(hosts=[{"host": "localhost",
"port": 9200}])
+ self.es = elasticsearch.Elasticsearch("http://localhost:9200")
self.index_name = "test_index"
self.doc_type = "log"
self.test_message = "some random stuff"
@@ -132,7 +132,7 @@ class TestElasticsearchTaskHandler:
def test_client_with_config(self):
es_conf = dict(conf.getsection("elasticsearch_configs"))
expected_dict = {
- "use_ssl": False,
+ "http_compress": False,
"verify_certs": True,
}
assert es_conf == expected_dict
@@ -210,7 +210,7 @@ class TestElasticsearchTaskHandler:
def test_read_with_missing_index(self, ti):
ts = pendulum.now()
with mock.patch.object(self.es_task_handler, "index_patterns",
new="nonexistent,test_*"):
- with pytest.raises(elasticsearch.exceptions.NotFoundError,
match=r".*nonexistent.*"):
+ with pytest.raises(elasticsearch.exceptions.NotFoundError,
match=r"IndexMissingException.*"):
self.es_task_handler.read(
ti, 1, {"offset": 0, "last_log_timestamp": str(ts),
"end_of_log": False}
)
@@ -365,7 +365,7 @@ class TestElasticsearchTaskHandler:
def test_read_raises(self, ti):
with mock.patch.object(self.es_task_handler.log, "exception") as
mock_exception:
with mock.patch.object(self.es_task_handler.client, "search") as
mock_execute:
- mock_execute.side_effect = ElasticsearchException("Failed to
read")
+ mock_execute.side_effect = SearchFailedException("Failed to
read")
logs, metadatas = self.es_task_handler.read(ti, 1)
assert mock_exception.call_count == 1
args, kwargs = mock_exception.call_args