This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ad9d8d46b6 Upgrade Elasticsearch to 8 (#33135)
ad9d8d46b6 is described below

commit ad9d8d46b6ee3a7d8e6665d2a6f5c6660063f281
Author: Owen Leung <[email protected]>
AuthorDate: Wed Aug 9 00:05:45 2023 +0800

    Upgrade Elasticsearch to 8 (#33135)
---
 airflow/config_templates/config.yml                |  92 -----------
 airflow/providers/elasticsearch/CHANGELOG.rst      |   6 +
 .../providers/elasticsearch/log/es_task_handler.py |  19 +--
 airflow/providers/elasticsearch/provider.yaml      |  96 ++++++++++-
 .../configurations-ref.rst                         |  18 +++
 .../index.rst                                      |   3 +-
 docs/apache-airflow/configurations-ref.rst         |   1 +
 generated/provider_dependencies.json               |   2 +-
 .../elasticsearch/log/elasticmock/__init__.py      |  44 ++++-
 .../log/elasticmock/fake_elasticsearch.py          |  26 ++-
 .../log/elasticmock/utilities/__init__.py          | 180 +++++++++++++++++++++
 .../elasticsearch/log/test_es_task_handler.py      |  10 +-
 12 files changed, 368 insertions(+), 129 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index f61f6baca0..082d4d4d51 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2317,98 +2317,6 @@ kerberos:
       type: boolean
       example: ~
       default: "True"
-elasticsearch:
-  description: ~
-  options:
-    host:
-      description: |
-        Elasticsearch host
-      version_added: 1.10.4
-      type: string
-      example: ~
-      default: ""
-    log_id_template:
-      description: |
-        Format of the log_id, which is used to query for a given tasks logs
-      version_added: 1.10.4
-      type: string
-      example: ~
-      is_template: true
-      default: "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
-    end_of_log_mark:
-      description: |
-        Used to mark the end of a log stream for a task
-      version_added: 1.10.4
-      type: string
-      example: ~
-      default: "end_of_log"
-    frontend:
-      description: |
-        Qualified URL for an elasticsearch frontend (like Kibana) with a 
template argument for log_id
-        Code will construct log_id using the log_id template from the argument 
above.
-        NOTE: scheme will default to https if one is not provided
-      version_added: 1.10.4
-      type: string
-      example: "http://localhost:5601/app/kibana#/discover\
-        ?_a=(columns:!(message),query:(language:kuery,query:'log_id: 
\"{log_id}\"'),sort:!(log.offset,asc))"
-      default: ""
-    write_stdout:
-      description: |
-        Write the task logs to the stdout of the worker, rather than the 
default files
-      version_added: 1.10.4
-      type: string
-      example: ~
-      default: "False"
-    json_format:
-      description: |
-        Instead of the default log formatter, write the log lines as JSON
-      version_added: 1.10.4
-      type: string
-      example: ~
-      default: "False"
-    json_fields:
-      description: |
-        Log fields to also attach to the json output, if enabled
-      version_added: 1.10.4
-      type: string
-      example: ~
-      default: "asctime, filename, lineno, levelname, message"
-    host_field:
-      description: |
-        The field where host name is stored (normally either `host` or 
`host.name`)
-      version_added: 2.1.1
-      type: string
-      example: ~
-      default: "host"
-    offset_field:
-      description: |
-        The field where offset is stored (normally either `offset` or 
`log.offset`)
-      version_added: 2.1.1
-      type: string
-      example: ~
-      default: "offset"
-    index_patterns:
-      description: |
-        Comma separated list of index patterns to use when searching for logs 
(default: `_all`).
-      version_added: 2.6.0
-      type: string
-      example: something-*
-      default: "_all"
-elasticsearch_configs:
-  description: ~
-  options:
-    use_ssl:
-      description: ~
-      version_added: 1.10.5
-      type: string
-      example: ~
-      default: "False"
-    verify_certs:
-      description: ~
-      version_added: 1.10.5
-      type: string
-      example: ~
-      default: "True"
 sensors:
   description: ~
   options:
diff --git a/airflow/providers/elasticsearch/CHANGELOG.rst 
b/airflow/providers/elasticsearch/CHANGELOG.rst
index a7daf14753..b7882d0656 100644
--- a/airflow/providers/elasticsearch/CHANGELOG.rst
+++ b/airflow/providers/elasticsearch/CHANGELOG.rst
@@ -27,6 +27,12 @@
 Changelog
 ---------
 
+.. note::
+  Upgrade to Elasticsearch 8. The ElasticsearchTaskHandler & 
ElasticsearchSQLHook will now use Elasticsearch 8 package.
+  As explained https://elasticsearch-py.readthedocs.io/en/stable , 
Elasticsearch language clients are only backwards
+  compatible with default distributions and without guarantees made, we 
recommend upgrading the version of
+  Elasticsearch database to 8 to ensure compatibility with the language client.
+
 5.0.0
 .....
 
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py 
b/airflow/providers/elasticsearch/log/es_task_handler.py
index ccac90480a..03bfe247c5 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -30,7 +30,7 @@ from urllib.parse import quote
 # Using `from elasticsearch import *` would break elasticsearch mocking used 
in unit test.
 import elasticsearch
 import pendulum
-from elasticsearch.exceptions import ElasticsearchException, NotFoundError
+from elasticsearch.exceptions import NotFoundError
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowProviderDeprecationWarning
@@ -89,7 +89,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
         json_fields: str,
         host_field: str = "host",
         offset_field: str = "offset",
-        host: str = "localhost:9200",
+        host: str = "http://localhost:9200";,
         frontend: str = "localhost:5601",
         index_patterns: str | None = conf.get("elasticsearch", 
"index_patterns", fallback="_all"),
         es_kwargs: dict | None = conf.getsection("elasticsearch_configs"),
@@ -101,8 +101,8 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
         super().__init__(base_log_folder, filename_template)
         self.closed = False
 
-        self.client = elasticsearch.Elasticsearch(host.split(";"), 
**es_kwargs)  # type: ignore[attr-defined]
-
+        self.client = elasticsearch.Elasticsearch(host, **es_kwargs)  # type: 
ignore[attr-defined]
+        # in airflow.cfg, host of elasticsearch has to be 
http://dockerhostXxxx:9200
         if USE_PER_RUN_LOG_ID and log_id_template is not None:
             warnings.warn(
                 "Passing log_id_template to ElasticsearchTaskHandler is 
deprecated and has no effect",
@@ -292,27 +292,24 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
         }
 
         try:
-            max_log_line = self.client.count(index=self.index_patterns, 
body=query)["count"]
+            max_log_line = self.client.count(index=self.index_patterns, 
body=query)["count"]  # type: ignore
         except NotFoundError as e:
             self.log.exception("The target index pattern %s does not exist", 
self.index_patterns)
             raise e
-        except ElasticsearchException as e:
-            self.log.exception("Could not get current log size with log_id: 
%s", log_id)
-            raise e
 
         logs: list[Any] | ElasticSearchResponse = []
         if max_log_line != 0:
             try:
                 query.update({"sort": [self.offset_field]})
-                res = self.client.search(
+                res = self.client.search(  # type: ignore
                     index=self.index_patterns,
                     body=query,
                     size=self.MAX_LINE_PER_PAGE,
                     from_=self.MAX_LINE_PER_PAGE * self.PAGE,
                 )
                 logs = ElasticSearchResponse(self, res)
-            except elasticsearch.exceptions.ElasticsearchException:
-                self.log.exception("Could not read log with log_id: %s", 
log_id)
+            except Exception as err:
+                self.log.exception("Could not read log with log_id: %s. 
Exception: %s", log_id, err)
 
         return logs
 
diff --git a/airflow/providers/elasticsearch/provider.yaml 
b/airflow/providers/elasticsearch/provider.yaml
index 2b6ffaabf2..b848c1647c 100644
--- a/airflow/providers/elasticsearch/provider.yaml
+++ b/airflow/providers/elasticsearch/provider.yaml
@@ -53,7 +53,7 @@ versions:
 dependencies:
   - apache-airflow>=2.4.0
   - apache-airflow-providers-common-sql>=1.3.1
-  - elasticsearch>7,<7.15.0
+  - elasticsearch>8,<9
 
 integrations:
   - integration-name: Elasticsearch
@@ -72,3 +72,97 @@ connection-types:
 
 logging:
   - 
airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler
+
+config:
+  elasticsearch:
+    description: ~
+    options:
+      host:
+        description: |
+          Elasticsearch host
+        version_added: 1.10.4
+        type: string
+        example: ~
+        default: ""
+      log_id_template:
+        description: |
+          Format of the log_id, which is used to query for a given tasks logs
+        version_added: 1.10.4
+        type: string
+        example: ~
+        is_template: true
+        default: "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
+      end_of_log_mark:
+        description: |
+          Used to mark the end of a log stream for a task
+        version_added: 1.10.4
+        type: string
+        example: ~
+        default: "end_of_log"
+      frontend:
+        description: |
+          Qualified URL for an elasticsearch frontend (like Kibana) with a 
template argument for log_id
+          Code will construct log_id using the log_id template from the 
argument above.
+          NOTE: scheme will default to https if one is not provided
+        version_added: 1.10.4
+        type: string
+        example: "http://localhost:5601/app/kibana#/discover\
+          ?_a=(columns:!(message),query:(language:kuery,query:'log_id: 
\"{log_id}\"'),sort:!(log.offset,asc))"
+        default: ""
+      write_stdout:
+        description: |
+          Write the task logs to the stdout of the worker, rather than the 
default files
+        version_added: 1.10.4
+        type: string
+        example: ~
+        default: "False"
+      json_format:
+        description: |
+          Instead of the default log formatter, write the log lines as JSON
+        version_added: 1.10.4
+        type: string
+        example: ~
+        default: "False"
+      json_fields:
+        description: |
+          Log fields to also attach to the json output, if enabled
+        version_added: 1.10.4
+        type: string
+        example: ~
+        default: "asctime, filename, lineno, levelname, message"
+      host_field:
+        description: |
+          The field where host name is stored (normally either `host` or 
`host.name`)
+        version_added: 2.1.1
+        type: string
+        example: ~
+        default: "host"
+      offset_field:
+        description: |
+          The field where offset is stored (normally either `offset` or 
`log.offset`)
+        version_added: 2.1.1
+        type: string
+        example: ~
+        default: "offset"
+      index_patterns:
+        description: |
+          Comma separated list of index patterns to use when searching for 
logs (default: `_all`).
+        version_added: 2.6.0
+        type: string
+        example: something-*
+        default: "_all"
+  elasticsearch_configs:
+    description: ~
+    options:
+      http_compress:
+        description: ~
+        version_added: 1.10.5
+        type: string
+        example: ~
+        default: "False"
+      verify_certs:
+        description: ~
+        version_added: 1.10.5
+        type: string
+        example: ~
+        default: "True"
diff --git a/docs/apache-airflow-providers-elasticsearch/configurations-ref.rst 
b/docs/apache-airflow-providers-elasticsearch/configurations-ref.rst
new file mode 100644
index 0000000000..5885c9d91b
--- /dev/null
+++ b/docs/apache-airflow-providers-elasticsearch/configurations-ref.rst
@@ -0,0 +1,18 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. include:: ../exts/includes/providers-configurations-ref.rst
diff --git a/docs/apache-airflow-providers-elasticsearch/index.rst 
b/docs/apache-airflow-providers-elasticsearch/index.rst
index 5a4582412a..f2b38434d2 100644
--- a/docs/apache-airflow-providers-elasticsearch/index.rst
+++ b/docs/apache-airflow-providers-elasticsearch/index.rst
@@ -43,6 +43,7 @@
     :maxdepth: 1
     :caption: References
 
+    Configuration <configurations-ref>
     Python API <_api/airflow/providers/elasticsearch/index>
 
 .. toctree::
@@ -103,7 +104,7 @@ PIP package                              Version required
 =======================================  ==================
 ``apache-airflow``                       ``>=2.4.0``
 ``apache-airflow-providers-common-sql``  ``>=1.3.1``
-``elasticsearch``                        ``>7,<7.15.0``
+``elasticsearch``                        ``>8,<9``
 =======================================  ==================
 
 Cross provider package dependencies
diff --git a/docs/apache-airflow/configurations-ref.rst 
b/docs/apache-airflow/configurations-ref.rst
index f323ea31f0..c92a9975e3 100644
--- a/docs/apache-airflow/configurations-ref.rst
+++ b/docs/apache-airflow/configurations-ref.rst
@@ -41,6 +41,7 @@ in the provider's documentation. The pre-installed providers 
that you may want t
 * :doc:`Configuration Reference for SMTP Provider 
<apache-airflow-providers-smtp:configurations-ref>`
 * :doc:`Configuration Reference for IMAP Provider 
<apache-airflow-providers-imap:configurations-ref>`
 * :doc:`Configuration Reference for OpenLineage Provider 
<apache-airflow-providers-openlineage:configurations-ref>`
+* :doc:`Configuration Reference for Elasticsearch Provider 
<apache-airflow-providers-elasticsearch:configurations-ref>`
 
 .. note::
     For more information see :doc:`/howto/set-config`.
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index 7300c0fc39..0bd446853e 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -358,7 +358,7 @@
     "deps": [
       "apache-airflow-providers-common-sql>=1.3.1",
       "apache-airflow>=2.4.0",
-      "elasticsearch>7,<7.15.0"
+      "elasticsearch>8,<9"
     ],
     "cross-providers-deps": [
       "common.sql"
diff --git a/tests/providers/elasticsearch/log/elasticmock/__init__.py 
b/tests/providers/elasticsearch/log/elasticmock/__init__.py
index 4f38d3d932..0884cff9ef 100644
--- a/tests/providers/elasticsearch/log/elasticmock/__init__.py
+++ b/tests/providers/elasticsearch/log/elasticmock/__init__.py
@@ -41,17 +41,55 @@ from __future__ import annotations
 """Elastic mock module used for testing"""
 from functools import wraps
 from unittest.mock import patch
-
-from elasticsearch.client.utils import _normalize_hosts
+from urllib.parse import unquote, urlparse
 
 from .fake_elasticsearch import FakeElasticsearch
 
 ELASTIC_INSTANCES: dict[str, FakeElasticsearch] = {}
 
 
+def _normalize_hosts(hosts):
+    """
+    Helper function to transform hosts argument to
+    :class:`~elasticsearch.Elasticsearch` to a list of dicts.
+    """
+    # if hosts are empty, just defer to defaults down the line
+    if hosts is None:
+        return [{}]
+
+    hosts = [hosts]
+
+    out = []
+
+    for host in hosts:
+        if "://" not in host:
+            host = f"//{host}"
+
+        parsed_url = urlparse(host)
+        h = {"host": parsed_url.hostname}
+
+        if parsed_url.port:
+            h["port"] = parsed_url.port
+
+        if parsed_url.scheme == "https":
+            h["port"] = parsed_url.port or 443
+            h["use_ssl"] = True
+
+        if parsed_url.username or parsed_url.password:
+            h["http_auth"] = 
f"{unquote(parsed_url.username)}:{unquote(parsed_url.password)}"
+
+        if parsed_url.path and parsed_url.path != "/":
+            h["url_prefix"] = parsed_url.path
+
+        out.append(h)
+    else:
+        out.append(host)
+    return out
+
+
 def _get_elasticmock(hosts=None, *args, **kwargs):
     host = _normalize_hosts(hosts)[0]
-    elastic_key = f"{host.get('host', 'localhost')}:{host.get('port', 9200)}"
+    elastic_key = f"http://{host.get('host', 'localhost')}:{host.get('port', 
9200)}"
 
     if elastic_key in ELASTIC_INSTANCES:
         connection = ELASTIC_INSTANCES.get(elastic_key)
diff --git 
a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py 
b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
index c4e25d290d..b37608232d 100644
--- a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
+++ b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
@@ -20,10 +20,9 @@ import fnmatch
 import json
 
 from elasticsearch import Elasticsearch
-from elasticsearch.client.utils import query_params
 from elasticsearch.exceptions import NotFoundError
 
-from .utilities import get_random_id
+from .utilities import MissingIndexException, get_random_id, query_params
 
 #
 # The MIT License (MIT)
@@ -53,7 +52,7 @@ class FakeElasticsearch(Elasticsearch):
     __documents_dict = None
 
     def __init__(self):
-        super().__init__()
+        super().__init__("http://localhost:9200";)
         self.__documents_dict = {}
 
     @query_params()
@@ -327,9 +326,8 @@ class FakeElasticsearch(Elasticsearch):
         "version",
     )
     def count(self, index=None, doc_type=None, body=None, params=None, 
headers=None):
-        searchable_indexes = self._normalize_index_to_list(index)
+        searchable_indexes = self._normalize_index_to_list(index, body)
         searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
-
         i = 0
         for searchable_index in searchable_indexes:
             for document in self.__documents_dict[searchable_index]:
@@ -376,7 +374,7 @@ class FakeElasticsearch(Elasticsearch):
         "version",
     )
     def search(self, index=None, doc_type=None, body=None, params=None, 
headers=None):
-        searchable_indexes = self._normalize_index_to_list(index)
+        searchable_indexes = self._normalize_index_to_list(index, body)
 
         matches = self._find_match(index, doc_type, body)
 
@@ -446,7 +444,7 @@ class FakeElasticsearch(Elasticsearch):
         return result_dict
 
     def _find_match(self, index, doc_type, body):
-        searchable_indexes = self._normalize_index_to_list(index)
+        searchable_indexes = self._normalize_index_to_list(index, body)
         searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
 
         must = body["query"]["bool"]["must"][0]  # only support one must
@@ -477,19 +475,20 @@ class FakeElasticsearch(Elasticsearch):
                     matches.append(document)
 
     # Check index(es) exists.
-    def _validate_search_targets(self, targets):
+    def _validate_search_targets(self, targets, body):
         # TODO: support allow_no_indices query parameter
         matches = set()
         for target in targets:
+            print(f"Loop over:::target = {target}")
             if target == "_all" or target == "":
                 matches.update(self.__documents_dict)
             elif "*" in target:
                 matches.update(fnmatch.filter(self.__documents_dict, target))
             elif target not in self.__documents_dict:
-                raise NotFoundError(404, f"IndexMissingException[[{target}] 
missing]")
+                raise 
MissingIndexException(msg=f"IndexMissingException[[{target}] missing]", 
body=body)
         return matches
 
-    def _normalize_index_to_list(self, index):
+    def _normalize_index_to_list(self, index, body):
         # Ensure to have a list of index
         if index is None:
             searchable_indexes = self.__documents_dict.keys()
@@ -501,11 +500,8 @@ class FakeElasticsearch(Elasticsearch):
             # Is it the correct exception to use ?
             raise ValueError("Invalid param 'index'")
 
-        return list(
-            self._validate_search_targets(
-                target for index in searchable_indexes for target in 
index.split(",")
-            )
-        )
+        generator = (target for index in searchable_indexes for target in 
index.split(","))
+        return list(self._validate_search_targets(generator, body))
 
     @staticmethod
     def _normalize_doc_type_to_list(doc_type):
diff --git 
a/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py 
b/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
index ef142d6d98..cb2d91f4ce 100644
--- a/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
+++ b/tests/providers/elasticsearch/log/elasticmock/utilities/__init__.py
@@ -39,13 +39,193 @@ from __future__ import annotations
 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 # SOFTWARE.
 """Utilities for Elastic mock"""
+import base64
 import random
 import string
+from datetime import date, datetime
+from functools import wraps
+
+from elasticsearch.exceptions import NotFoundError
 
 DEFAULT_ELASTICSEARCH_ID_SIZE = 20
 CHARSET_FOR_ELASTICSEARCH_ID = string.ascii_letters + string.digits
+GLOBAL_PARAMS = ("pretty", "human", "error_trace", "format", "filter_path")
 
 
 def get_random_id(size=DEFAULT_ELASTICSEARCH_ID_SIZE):
     """Returns random if for elasticsearch"""
     return "".join(random.choice(CHARSET_FOR_ELASTICSEARCH_ID) for _ in 
range(size))
+
+
+def query_params(*es_query_params, **kwargs):
+    """
+    Decorator that pops all accepted parameters from method's kwargs and puts
+    them in the params argument.
+    """
+    body_params = kwargs.pop("body_params", None)
+    body_only_params = set(body_params or ()) - set(es_query_params)
+    body_name = kwargs.pop("body_name", None)
+    body_required = kwargs.pop("body_required", False)
+    type_possible_in_params = "type" in es_query_params
+
+    assert not (body_name and body_params)
+
+    assert not (body_name and body_required)
+    assert not body_required or body_params
+
+    def _wrapper(func):
+        @wraps(func)
+        def _wrapped(*args, **kwargs):
+            params = (kwargs.pop("params", None) or {}).copy()
+            headers = {k.lower(): v for k, v in (kwargs.pop("headers", None) 
or {}).copy().items()}
+
+            if "opaque_id" in kwargs:
+                headers["x-opaque-id"] = kwargs.pop("opaque_id")
+
+            http_auth = kwargs.pop("http_auth", None)
+            api_key = kwargs.pop("api_key", None)
+
+            using_body_kwarg = kwargs.get("body", None) is not None
+            using_positional_args = args and len(args) > 1
+
+            if type_possible_in_params:
+                doc_type_in_params = params and "doc_type" in params
+                doc_type_in_kwargs = "doc_type" in kwargs
+
+                if doc_type_in_params:
+                    params["type"] = params.pop("doc_type")
+                if doc_type_in_kwargs:
+                    kwargs["type"] = kwargs.pop("doc_type")
+
+            if using_body_kwarg or using_positional_args:
+                body_only_params_in_use = body_only_params.intersection(kwargs)
+                if body_only_params_in_use:
+                    params_prose = "', '".join(sorted(body_only_params_in_use))
+                    plural_params = len(body_only_params_in_use) > 1
+
+                    raise TypeError(
+                        f"The '{params_prose}' parameter{'s' if plural_params 
else ''} "
+                        f"{'are' if plural_params else 'is'} only serialized 
in the "
+                        f"request body and can't be combined with the 'body' 
parameter. "
+                        f"Either stop using the 'body' parameter and use 
keyword-arguments "
+                        f"only or move the specified parameters into the 
'body'. "
+                        f"See 
https://github.com/elastic/elasticsearch-py/issues/1698 "
+                        f"for more information"
+                    )
+
+            elif set(body_params or ()).intersection(kwargs):
+                body = {}
+                for param in body_params:
+                    value = kwargs.pop(param, None)
+                    if value is not None:
+                        body[param.rstrip("_")] = value
+                kwargs["body"] = body
+
+            elif body_required:
+                kwargs["body"] = {}
+
+            if body_name:
+                if body_name in kwargs:
+                    if using_body_kwarg:
+                        raise TypeError(
+                            f"Can't use '{body_name}' and 'body' parameters 
together"
+                            f" because '{body_name}' is an alias for 'body'. "
+                            f"Instead you should only use the '{body_name}' "
+                            f"parameter. See 
https://github.com/elastic/elasticsearch-py/issues/1698 "
+                            f"for more information"
+                        )
+                    kwargs["body"] = kwargs.pop(body_name)
+
+            if http_auth is not None and api_key is not None:
+                raise ValueError("Only one of 'http_auth' and 'api_key' may be 
passed at a time")
+            elif http_auth is not None:
+                headers["authorization"] = f"Basic 
{_base64_auth_header(http_auth)}"
+            elif api_key is not None:
+                headers["authorization"] = f"ApiKey 
{_base64_auth_header(api_key)}"
+
+            for p in es_query_params + GLOBAL_PARAMS:
+                if p in kwargs:
+                    v = kwargs.pop(p)
+                    if v is not None:
+                        params[p] = _escape(v)
+
+            for p in ("ignore", "request_timeout"):
+                if p in kwargs:
+                    params[p] = kwargs.pop(p)
+            return func(*args, params=params, headers=headers, **kwargs)
+
+        return _wrapped
+
+    return _wrapper
+
+
+def to_str(x, encoding="ascii"):
+    if not isinstance(x, str):
+        return x.decode(encoding)
+    return x
+
+
+def to_bytes(x, encoding="ascii"):
+    if not isinstance(x, bytes):
+        return x.encode(encoding)
+    return x
+
+
+def _base64_auth_header(auth_value):
+    """Takes either a 2-tuple or a base64-encoded string
+    and returns a base64-encoded string to be used
+    as an HTTP authorization header.
+    """
+    if isinstance(auth_value, (list, tuple)):
+        auth_value = base64.b64encode(to_bytes(":".join(auth_value)))
+    return to_str(auth_value)
+
+
+def _escape(value):
+    """
+    Escape a single value of a URL string or a query parameter. If it is a list
+    or tuple, turn it into a comma-separated string first.
+    """
+
+    # make sequences into comma-separated strings
+    if isinstance(value, (list, tuple)):
+        value = ",".join(value)
+
+    # dates and datetimes into isoformat
+    elif isinstance(value, (date, datetime)):
+        value = value.isoformat()
+
+    # make bools into true/false strings
+    elif isinstance(value, bool):
+        value = str(value).lower()
+
+    # don't decode bytestrings
+    elif isinstance(value, bytes):
+        return value
+
+    # encode strings to utf-8
+    if isinstance(value, str):
+        return value.encode("utf-8")
+
+    return str(value)
+
+
+class MissingIndexException(NotFoundError):
+    """Exception representing a missing index."""
+
+    def __init__(self, msg, body):
+        self.msg = msg
+        self.body = body
+
+    def __str__(self):
+        return f"IndexMissingException[[{self.msg}] missing] with body 
{self.body}"
+
+
+class SearchFailedException(NotFoundError):
+    """Exception representing a search failure."""
+
+    def __init__(self, msg):
+        self.msg = msg
+
+    def __str__(self):
+        return f"SearchFailedException: {self.msg}"
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py 
b/tests/providers/elasticsearch/log/test_es_task_handler.py
index 93137ff2b2..7ae894f22a 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -29,7 +29,6 @@ from urllib.parse import quote
 import elasticsearch
 import pendulum
 import pytest
-from elasticsearch.exceptions import ElasticsearchException
 
 from airflow.configuration import conf
 from airflow.providers.elasticsearch.log.es_response import 
ElasticSearchResponse
@@ -40,6 +39,7 @@ from airflow.utils.timezone import datetime
 from tests.test_utils.db import clear_db_dags, clear_db_runs
 
 from .elasticmock import elasticmock
+from .elasticmock.utilities import SearchFailedException
 
 
 def get_ti(dag_id, task_id, execution_date, create_task_instance):
@@ -94,7 +94,7 @@ class TestElasticsearchTaskHandler:
             offset_field=self.offset_field,
         )
 
-        self.es = elasticsearch.Elasticsearch(hosts=[{"host": "localhost", 
"port": 9200}])
+        self.es = elasticsearch.Elasticsearch("http://localhost:9200";)
         self.index_name = "test_index"
         self.doc_type = "log"
         self.test_message = "some random stuff"
@@ -132,7 +132,7 @@ class TestElasticsearchTaskHandler:
     def test_client_with_config(self):
         es_conf = dict(conf.getsection("elasticsearch_configs"))
         expected_dict = {
-            "use_ssl": False,
+            "http_compress": False,
             "verify_certs": True,
         }
         assert es_conf == expected_dict
@@ -210,7 +210,7 @@ class TestElasticsearchTaskHandler:
     def test_read_with_missing_index(self, ti):
         ts = pendulum.now()
         with mock.patch.object(self.es_task_handler, "index_patterns", 
new="nonexistent,test_*"):
-            with pytest.raises(elasticsearch.exceptions.NotFoundError, 
match=r".*nonexistent.*"):
+            with pytest.raises(elasticsearch.exceptions.NotFoundError, 
match=r"IndexMissingException.*"):
                 self.es_task_handler.read(
                     ti, 1, {"offset": 0, "last_log_timestamp": str(ts), 
"end_of_log": False}
                 )
@@ -365,7 +365,7 @@ class TestElasticsearchTaskHandler:
     def test_read_raises(self, ti):
         with mock.patch.object(self.es_task_handler.log, "exception") as 
mock_exception:
             with mock.patch.object(self.es_task_handler.client, "search") as 
mock_execute:
-                mock_execute.side_effect = ElasticsearchException("Failed to 
read")
+                mock_execute.side_effect = SearchFailedException("Failed to 
read")
                 logs, metadatas = self.es_task_handler.read(ti, 1)
             assert mock_exception.call_count == 1
             args, kwargs = mock_exception.call_args

Reply via email to