This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b28c90354f Deprecate the 2 non-official elasticsearch libraries
(#31920)
b28c90354f is described below
commit b28c90354f110bd598ddce193cf82cb1416adbc8
Author: Owen Leung <[email protected]>
AuthorDate: Sat Jun 24 06:44:51 2023 +0800
Deprecate the 2 non-official elasticsearch libraries (#31920)
---------
Co-authored-by: eladkal <[email protected]>
---
airflow/providers/elasticsearch/CHANGELOG.rst | 6 ++
.../providers/elasticsearch/hooks/elasticsearch.py | 39 ++++++-
.../providers/elasticsearch/log/es_task_handler.py | 116 ++++++++++++++++-----
airflow/providers/elasticsearch/provider.yaml | 8 +-
docker_tests/test_prod_image.py | 2 +-
.../index.rst | 2 -
docs/spelling_wordlist.txt | 1 +
generated/provider_dependencies.json | 2 -
tests/models/test_dag.py | 4 +-
.../elasticsearch/log/test_es_task_handler.py | 6 +-
10 files changed, 143 insertions(+), 43 deletions(-)
diff --git a/airflow/providers/elasticsearch/CHANGELOG.rst
b/airflow/providers/elasticsearch/CHANGELOG.rst
index 0b281154c6..e0f2aa0894 100644
--- a/airflow/providers/elasticsearch/CHANGELOG.rst
+++ b/airflow/providers/elasticsearch/CHANGELOG.rst
@@ -24,6 +24,12 @@
Changelog
---------
+5.0.0
+.....
+
+.. note::
+ Deprecate non-official elasticsearch libraries. Only the official
elasticsearch library was used
+
4.5.1
.....
diff --git a/airflow/providers/elasticsearch/hooks/elasticsearch.py
b/airflow/providers/elasticsearch/hooks/elasticsearch.py
index 1a008617bd..bc55762a15 100644
--- a/airflow/providers/elasticsearch/hooks/elasticsearch.py
+++ b/airflow/providers/elasticsearch/hooks/elasticsearch.py
@@ -20,9 +20,9 @@ from __future__ import annotations
import warnings
from functools import cached_property
from typing import Any
+from urllib import parse
from elasticsearch import Elasticsearch
-from es.elastic.api import Connection as ESConnection, connect
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.hooks.base import BaseHook
@@ -30,6 +30,43 @@ from airflow.models.connection import Connection as
AirflowConnection
from airflow.providers.common.sql.hooks.sql import DbApiHook
+def connect(
+ host: str = "localhost",
+ port: int = 9200,
+ user: str | None = None,
+ password: str | None = None,
+ scheme: str = "http",
+ **kwargs: Any,
+) -> ESConnection:
+ return ESConnection(host, port, user, password, scheme, **kwargs)
+
+
+class ESConnection:
+ """wrapper class for elasticsearch.Elasticsearch."""
+
+ def __init__(
+ self,
+ host: str = "localhost",
+ port: int = 9200,
+ user: str | None = None,
+ password: str | None = None,
+ scheme: str = "http",
+ **kwargs: Any,
+ ):
+ self.host = host
+ self.port = port
+ self.user = user
+ self.password = password
+ self.scheme = scheme
+ self.kwargs = kwargs
+ netloc = f"{host}:{port}"
+ self.url = parse.urlunparse((scheme, netloc, "/", None, None, None))
+ if user and password:
+ self.es = Elasticsearch(self.url, http_auth=(user, password),
**self.kwargs)
+ else:
+ self.es = Elasticsearch(self.url, **self.kwargs)
+
+
class ElasticsearchSQLHook(DbApiHook):
"""
Interact with Elasticsearch through the elasticsearch-dbapi.
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py
b/airflow/providers/elasticsearch/log/es_task_handler.py
index cb949566d6..8c8847fe1a 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -30,7 +30,7 @@ from urllib.parse import quote
# Using `from elasticsearch import *` would break elasticsearch mocking used
in unit test.
import elasticsearch
import pendulum
-from elasticsearch_dsl import Search
+from elasticsearch.exceptions import ElasticsearchException, NotFoundError
from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
@@ -52,6 +52,34 @@ EsLogMsgType = List[Tuple[str, str]]
USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
+class Log:
+ """wrapper class to mimic the attributes in Search class used in
elasticsearch_dsl.Search."""
+
+ def __init__(self, offset):
+ self.offset = offset
+
+
+class ElasticSearchResponse:
+ """wrapper class to mimic the Search class used in
elasticsearch_dsl.Search."""
+
+ def __init__(self, **kwargs):
+ # Store all provided keyword arguments as attributes of this object
+ for key, value in kwargs.items():
+ if key == "log":
+ setattr(self, key, Log(**value))
+ else:
+ setattr(self, key, value)
+
+ def to_dict(self):
+ result = {}
+ for key in self.__dict__.keys():
+ if key == "log":
+ result[key] = self.__dict__[key].__dict__
+ else:
+ result[key] = self.__dict__[key]
+ return result
+
+
class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin,
LoggingMixin):
"""
ElasticsearchTaskHandler is a python log handler that
@@ -209,12 +237,9 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
offset = metadata["offset"]
log_id = self._render_log_id(ti, try_number)
-
logs = self.es_read(log_id, offset, metadata)
logs_by_host = self._group_logs_by_host(logs)
-
next_offset = offset if not logs else
attrgetter(self.offset_field)(logs[-1])
-
# Ensure a string here. Large offset numbers will get JSON.parsed
incorrectly
# on the client. Sending as a string prevents this issue.
#
https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER
@@ -259,7 +284,6 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
return "\n".join(self._format_msg(lines[i]) for i in
range(log_range))
message = [(host, concat_logs(hosted_log)) for host, hosted_log in
logs_by_host.items()]
-
return message, metadata
def _format_msg(self, log_line):
@@ -287,31 +311,43 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
:param metadata: log metadata, used for steaming log download.
"""
# Offset is the unique key for sorting logs given log_id.
- search = (
- Search(index=self.index_patterns, using=self.client)
- .query("match_phrase", log_id=log_id)
- .sort(self.offset_field)
- )
+ query = {
+ "query": {
+ "bool": {
+ "must": [
+ {"match_phrase": {"log_id": log_id}},
+ {"range": {self.offset_field: {"gt": int(offset)}}},
+ ]
+ }
+ },
+ "sort": [{self.offset_field: {"order": "asc"}}],
+ }
- search = search.filter("range", **{self.offset_field: {"gt":
int(offset)}})
- max_log_line = search.count()
- if "download_logs" in metadata and metadata["download_logs"] and
"max_offset" not in metadata:
- try:
- if max_log_line > 0:
- metadata["max_offset"] = attrgetter(self.offset_field)(
- search[max_log_line - 1].execute()[-1]
- )
- else:
- metadata["max_offset"] = 0
- except Exception:
- self.log.exception("Could not get current log size with
log_id: %s", log_id)
+ try:
+ max_log_line = self.client.count(index=self.index_patterns,
body=query)["count"]
+ except NotFoundError as e:
+ self.log.exception("The target index pattern %s does not exist",
self.index_patterns)
+ raise e
+ except ElasticsearchException as e:
+ self.log.exception("Could not get current log size with log_id:
%s", log_id)
+ raise e
logs = []
if max_log_line != 0:
try:
-
- logs = search[self.MAX_LINE_PER_PAGE * self.PAGE :
self.MAX_LINE_PER_PAGE].execute()
- except Exception:
+ res = self.client.search(
+ index=self.index_patterns,
+ body=query,
+ size=self.MAX_LINE_PER_PAGE,
+ from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+ )
+ logs = [
+ ElasticSearchResponse(
+ **unwrap_response(response),
+ )
+ for response in res["hits"]["hits"]
+ ]
+ except elasticsearch.exceptions.ElasticsearchException:
self.log.exception("Could not read log with log_id: %s",
log_id)
return logs
@@ -429,3 +465,33 @@ def getattr_nested(obj, item, default):
return attrgetter(item)(obj)
except AttributeError:
return default
+
+
+def unwrap_response(res):
+ source = res["_source"]
+ transformed = {
+ "log_id": source.get("log_id"),
+ "message": source.get("message"),
+ "meta": {
+ "id": res.get("_id"),
+ "index": res.get("_index"),
+ "version": res.get("_version"),
+ "headers": res.get("_headers"),
+ },
+ }
+ if "offset" in source:
+ transformed["offset"] = source["offset"]
+ if "asctime" in source:
+ transformed["asctime"] = source["asctime"]
+ if "filename" in source:
+ transformed["filename"] = source["filename"]
+ if "host" in source:
+ transformed["host"] = source["host"]
+ if "levelname" in source:
+ transformed["levelname"] = source["levelname"]
+ if "lineno" in source:
+ transformed["lineno"] = source["lineno"]
+ if "log" in source:
+ transformed["log"] = source["log"]
+
+ return transformed
diff --git a/airflow/providers/elasticsearch/provider.yaml
b/airflow/providers/elasticsearch/provider.yaml
index b7b34a497b..2b6ffaabf2 100644
--- a/airflow/providers/elasticsearch/provider.yaml
+++ b/airflow/providers/elasticsearch/provider.yaml
@@ -23,6 +23,7 @@ description: |
suspended: false
versions:
+ - 5.0.0
- 4.5.1
- 4.5.0
- 4.4.0
@@ -52,14 +53,7 @@ versions:
dependencies:
- apache-airflow>=2.4.0
- apache-airflow-providers-common-sql>=1.3.1
- # We cannot use elasticsearch 8 yet. The elasticsearch-dsl is not compatible
with it.
- # elasticsearch>=7.15.0 breaks our tests.The eleasticsearch 7 is an old
version that is not
- # supported anymore. We should move to elasticsearch 8 as
- # likely requires getting rid of elasticsearch-dsl or waiting until there is
a compatible version
- # We can also try to replace the <7.15.0 with <8.0.0 but we need to solve
the test failures first
- elasticsearch>7,<7.15.0
- - elasticsearch-dbapi
- - elasticsearch-dsl>=5.0.0
integrations:
- integration-name: Elasticsearch
diff --git a/docker_tests/test_prod_image.py b/docker_tests/test_prod_image.py
index e76f04dfd8..46cba5fbfa 100644
--- a/docker_tests/test_prod_image.py
+++ b/docker_tests/test_prod_image.py
@@ -127,7 +127,7 @@ class TestPythonPackages:
"cncf.kubernetes": ["kubernetes", "cryptography"],
"dask": ["cloudpickle", "distributed"],
"docker": ["docker"],
- "elasticsearch": ["elasticsearch", "es.elastic", "elasticsearch_dsl"],
+ "elasticsearch": ["elasticsearch"],
"google": [
"OpenSSL",
# "google.ads", Remove google ads as it is vendored in google
provider now
diff --git a/docs/apache-airflow-providers-elasticsearch/index.rst
b/docs/apache-airflow-providers-elasticsearch/index.rst
index 24451a7921..9d9f0b24cb 100644
--- a/docs/apache-airflow-providers-elasticsearch/index.rst
+++ b/docs/apache-airflow-providers-elasticsearch/index.rst
@@ -92,8 +92,6 @@ PIP package Version required
``apache-airflow`` ``>=2.4.0``
``apache-airflow-providers-common-sql`` ``>=1.3.1``
``elasticsearch`` ``>7,<7.15.0``
-``elasticsearch-dbapi``
-``elasticsearch-dsl`` ``>=5.0.0``
======================================= ==================
Cross provider package dependencies
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index babb506f41..d2a759e931 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -486,6 +486,7 @@ Drivy
dropdown
druidHook
ds
+dsl
Dsn
dsn
dttm
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index 8d14eb5806..2a1ae40e3f 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -336,8 +336,6 @@
"deps": [
"apache-airflow-providers-common-sql>=1.3.1",
"apache-airflow>=2.4.0",
- "elasticsearch-dbapi",
- "elasticsearch-dsl>=5.0.0",
"elasticsearch>7,<7.15.0"
],
"cross-providers-deps": [
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index fa2bcd7052..3c739c115f 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3642,7 +3642,7 @@ class TestTaskClearingSetupTeardownBehavior:
with s2 >> t2:
BaseOperator(task_id="w2")
BaseOperator(task_id="w3")
- # todo: implement tests
+ # to_do: implement tests
def test_get_flat_relative_ids_with_setup_nested_no_ctx_mgr(self):
"""Let's test some gnarlier cases here"""
@@ -3773,7 +3773,7 @@ class TestTaskClearingSetupTeardownBehavior:
g2_group_teardown = dag.task_dict["g2.group_teardown"]
with pytest.raises(Exception):
- # fixme
+ # fix_me
# the line `dag_setup >> tg >> dag_teardown` should be
equivalent to
# dag_setup >> group_setup; w3 >> dag_teardown
# i.e. not group_teardown >> dag_teardown
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py
b/tests/providers/elasticsearch/log/test_es_task_handler.py
index e3ba8f47ae..d402cec8aa 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -29,6 +29,7 @@ from urllib.parse import quote
import elasticsearch
import pendulum
import pytest
+from elasticsearch.exceptions import ElasticsearchException
from airflow.configuration import conf
from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchTaskHandler, getattr_nested
@@ -341,13 +342,12 @@ class TestElasticsearchTaskHandler:
def test_read_raises(self, ti):
with mock.patch.object(self.es_task_handler.log, "exception") as
mock_exception:
- with mock.patch("elasticsearch_dsl.Search.execute") as
mock_execute:
- mock_execute.side_effect = Exception("Failed to read")
+ with mock.patch.object(self.es_task_handler.client, "search") as
mock_execute:
+ mock_execute.side_effect = ElasticsearchException("Failed to
read")
logs, metadatas = self.es_task_handler.read(ti, 1)
assert mock_exception.call_count == 1
args, kwargs = mock_exception.call_args
assert "Could not read log with log_id:" in args[0]
-
assert 1 == len(logs)
assert len(logs) == len(metadatas)
assert [[]] == logs