This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 41e611929e Fix Failing ES Remote Logging (#32438)
41e611929e is described below
commit 41e611929eadcbbc746d956647967c82f40fae4a
Author: Owen Leung <[email protected]>
AuthorDate: Sat Jul 15 14:30:04 2023 +0800
Fix Failing ES Remote Logging (#32438)
* Update es_task_handler.py
---
airflow/providers/elasticsearch/log/es_response.py | 157 ++++++++++++++++++
.../providers/elasticsearch/log/es_task_handler.py | 180 ++++++++++++---------
.../log/elasticmock/fake_elasticsearch.py | 114 ++++++++++++-
.../elasticsearch/log/test_es_task_handler.py | 22 +++
4 files changed, 397 insertions(+), 76 deletions(-)
diff --git a/airflow/providers/elasticsearch/log/es_response.py
b/airflow/providers/elasticsearch/log/es_response.py
new file mode 100644
index 0000000000..9a13847a82
--- /dev/null
+++ b/airflow/providers/elasticsearch/log/es_response.py
@@ -0,0 +1,157 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+
+def _wrap(val):
+ if isinstance(val, dict):
+ return AttributeDict(val)
+ return val
+
+
+class AttributeList:
+ """Helper class to provide attribute like access to List objects."""
+
+ def __init__(self, _list):
+ if not isinstance(_list, list):
+ _list = list(_list)
+ self._l_ = _list
+
+ def __getitem__(self, k):
+ val = self._l_[k]
+ if isinstance(val, slice):
+ return AttributeList(val)
+ return _wrap(val)
+
+ def __iter__(self):
+ return map(lambda i: _wrap(i), self._l_)
+
+ def __bool__(self):
+ return bool(self._l_)
+
+
+class AttributeDict:
+ """Helper class to provide attribute like access to Dictionary objects."""
+
+ def __init__(self, d):
+ super().__setattr__("_d_", d)
+
+ def __getattr__(self, attr_name):
+ try:
+ return self.__getitem__(attr_name)
+ except KeyError:
+ raise AttributeError(f"{self.__class__.__name__!r} object has no
attribute {attr_name!r}")
+
+ def __getitem__(self, key):
+ return _wrap(self._d_[key])
+
+ def to_dict(self):
+ return self._d_
+
+
+class Hit(AttributeDict):
+ """
+ The Hit class is used to manage and access elements in a document.
+ It inherits from the AttributeDict class and provides
+ attribute-like access to its elements, similar to a dictionary.
+ """
+
+ def __init__(self, document):
+ data = {}
+ if "_source" in document:
+ data = document["_source"]
+ if "fields" in document:
+ data.update(document["fields"])
+
+ super().__init__(data)
+ super().__setattr__("meta", HitMeta(document))
+
+
+class HitMeta(AttributeDict):
+ """
+ The HitMeta class is used to manage and access metadata of a document.
+
+ This class inherits from the AttributeDict class and provides
+ attribute-like access to its elements.
+ """
+
+ def __init__(self, document, exclude=("_source", "_fields")):
+ d = {k[1:] if k.startswith("_") else k: v for (k, v) in
document.items() if k not in exclude}
+ if "type" in d:
+ # make sure we are consistent everywhere in python
+ d["doc_type"] = d.pop("type")
+ super().__init__(d)
+
+
+class ElasticSearchResponse(AttributeDict):
+ """
+ The ElasticSearchResponse class is used to manage and access the response
from an Elasticsearch search.
+
+ This class can be iterated over directly to access hits in the response.
Indexing the class instance
+ with an integer or slice will also access the hits. The class also
evaluates to True
+ if there are any hits in the response.
+
+ The hits property returns an AttributeList of hits in the response, with
each hit transformed into
+ an instance of the doc_class if provided.
+
+ The response parameter stores the dictionary returned by the Elasticsearch
client search method.
+ """
+
+ def __init__(self, search, response, doc_class=None):
+ super().__setattr__("_search", search)
+ super().__setattr__("_doc_class", doc_class)
+ super().__init__(response)
+
+ def __iter__(self):
+ return iter(self.hits)
+
+ def __getitem__(self, key):
+ if isinstance(key, (slice, int)):
+ return self.hits[key]
+ return super().__getitem__(key)
+
+ def __bool__(self):
+ return bool(self.hits)
+
+ @property
+ def hits(self):
+ """
+ This property provides access to the hits (i.e., the results) of the
Elasticsearch response.
+
+ The hits are represented as an `AttributeList` of `Hit` instances,
which allow for easy,
+ attribute-like access to the hit data.
+
+ The hits are lazily loaded, meaning they're not processed until this
property is accessed.
+ Upon first access, the hits data from the response is processed using
the `_get_result` method
+ of the associated `Search` instance (i.e. an instance from
ElasticsearchTaskHandler class),
+ and the results are stored for future accesses.
+
+ Each hit also includes all the additional data present in the "hits"
field of the response,
+ accessible as attributes of the hit.
+ """
+ if not hasattr(self, "_hits"):
+ h = self._d_["hits"]
+
+ try:
+ hits = AttributeList(map(self._search._get_result, h["hits"]))
+ except AttributeError as e:
+ raise TypeError("Could not parse hits.", e)
+
+ super().__setattr__("_hits", hits)
+ for k in h:
+ setattr(self._hits, k, _wrap(h[k]))
+ return self._hits
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py
b/airflow/providers/elasticsearch/log/es_task_handler.py
index 1bc071d2c2..2a780b622e 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -24,7 +24,7 @@ from collections import defaultdict
from datetime import datetime
from operator import attrgetter
from time import time
-from typing import TYPE_CHECKING, List, Tuple
+from typing import TYPE_CHECKING, Any, Callable, List, Tuple
from urllib.parse import quote
# Using `from elasticsearch import *` would break elasticsearch mocking used
in unit test.
@@ -37,6 +37,7 @@ from airflow.exceptions import
AirflowProviderDeprecationWarning
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.providers.elasticsearch.log.es_json_formatter import
ElasticsearchJSONFormatter
+from airflow.providers.elasticsearch.log.es_response import
ElasticSearchResponse, Hit
from airflow.utils import timezone
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
@@ -52,34 +53,6 @@ EsLogMsgType = List[Tuple[str, str]]
USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
-class Log:
- """wrapper class to mimic the attributes in Search class used in
elasticsearch_dsl.Search."""
-
- def __init__(self, offset):
- self.offset = offset
-
-
-class ElasticSearchResponse:
- """wrapper class to mimic the Search class used in
elasticsearch_dsl.Search."""
-
- def __init__(self, **kwargs):
- # Store all provided keyword arguments as attributes of this object
- for key, value in kwargs.items():
- if key == "log":
- setattr(self, key, Log(**value))
- else:
- setattr(self, key, value)
-
- def to_dict(self):
- result = {}
- for key in self.__dict__.keys():
- if key == "log":
- result[key] = self.__dict__[key].__dict__
- else:
- result[key] = self.__dict__[key]
- return result
-
-
class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin,
LoggingMixin):
"""
ElasticsearchTaskHandler is a python log handler that reads logs from
Elasticsearch.
@@ -150,6 +123,8 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
self.formatter: logging.Formatter
self.handler: logging.FileHandler | logging.StreamHandler # type:
ignore[assignment]
+ self._doc_type_map: dict[Any, Any] = {}
+ self._doc_type: list[Any] = []
def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
with create_session() as session:
@@ -299,7 +274,7 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
# Just a safe-guard to preserve backwards-compatibility
return log_line.message
- def es_read(self, log_id: str, offset: str, metadata: dict) -> list:
+ def es_read(self, log_id: str, offset: str, metadata: dict) -> list |
ElasticSearchResponse:
"""
Return the logs matching log_id in Elasticsearch and next offset or ''.
@@ -307,17 +282,13 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
:param offset: the offset start to read log from.
:param metadata: log metadata, used for steaming log download.
"""
- # Offset is the unique key for sorting logs given log_id.
- query = {
+ query: dict[Any, Any] = {
"query": {
"bool": {
- "must": [
- {"match_phrase": {"log_id": log_id}},
- {"range": {self.offset_field: {"gt": int(offset)}}},
- ]
+ "filter": [{"range": {self.offset_field: {"gt":
int(offset)}}}],
+ "must": [{"match_phrase": {"log_id": log_id}}],
}
- },
- "sort": [{self.offset_field: {"order": "asc"}}],
+ }
}
try:
@@ -329,21 +300,17 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
self.log.exception("Could not get current log size with log_id:
%s", log_id)
raise e
- logs = []
+ logs: list[Any] | ElasticSearchResponse = []
if max_log_line != 0:
try:
+ query.update({"sort": [self.offset_field]})
res = self.client.search(
index=self.index_patterns,
body=query,
size=self.MAX_LINE_PER_PAGE,
from_=self.MAX_LINE_PER_PAGE * self.PAGE,
)
- logs = [
- ElasticSearchResponse(
- **unwrap_response(response),
- )
- for response in res["hits"]["hits"]
- ]
+ logs = ElasticSearchResponse(self, res)
except elasticsearch.exceptions.ElasticsearchException:
self.log.exception("Could not read log with log_id: %s",
log_id)
@@ -448,6 +415,99 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
"""Whether we can support external links."""
return bool(self.frontend)
+ def _resolve_nested(self, hit: dict[Any, Any], parent_class=None) ->
type[Hit]:
+ """
+ Resolves nested hits from Elasticsearch by iteratively navigating the
`_nested` field.
+ The result is used to fetch the appropriate document class to handle
the hit.
+
+ This method can be used with nested Elasticsearch fields which are
structured
+ as dictionaries with "field" and "_nested" keys.
+ """
+ doc_class = Hit
+
+ nested_path: list[str] = []
+ nesting = hit["_nested"]
+ while nesting and "field" in nesting:
+ nested_path.append(nesting["field"])
+ nesting = nesting.get("_nested")
+ nested_path_str = ".".join(nested_path)
+
+ if hasattr(parent_class, "_index"):
+ nested_field = parent_class._index.resolve_field(nested_path_str)
+
+ if nested_field is not None:
+ return nested_field._doc_class
+
+ return doc_class
+
+ def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit:
+ """
+ This method processes a hit (i.e., a result) from an Elasticsearch
response and transforms it into an
+ appropriate class instance.
+
+ The transformation depends on the contents of the hit. If the document
in hit contains a nested field,
+ the '_resolve_nested' method is used to determine the appropriate
class (based on the nested path).
+ If the hit has a document type that is present in the '_doc_type_map',
the corresponding class is
+ used. If not, the method iterates over the '_doc_type' classes and
uses the first one whose '_matches'
+ method returns True for the hit.
+
+ If the hit contains any 'inner_hits', these are also processed into
'ElasticSearchResponse' instances
+ using the determined class.
+
+ Finally, the transformed hit is returned. If the determined class has
a 'from_es' method, this is
+ used to transform the hit
+
+ An example of the hit argument:
+
+ {'_id': 'jdeZT4kBjAZqZnexVUxk',
+ '_index': '.ds-filebeat-8.8.2-2023.07.09-000001',
+ '_score': 2.482621,
+ '_source': {'@timestamp': '2023-07-13T14:13:15.140Z',
+ 'asctime': '2023-07-09T07:47:43.907+0000',
+ 'container': {'id': 'airflow'},
+ 'dag_id': 'example_bash_operator',
+ 'ecs': {'version': '8.0.0'},
+ 'execution_date': '2023_07_09T07_47_32_000000',
+ 'filename': 'taskinstance.py',
+ 'input': {'type': 'log'},
+ 'levelname': 'INFO',
+ 'lineno': 1144,
+ 'log': {'file': {'path':
"/opt/airflow/Documents/GitHub/airflow/logs/
+ dag_id=example_bash_operator'/run_id=owen_run_run/
+ task_id=run_after_loop/attempt=1.log"},
+ 'offset': 0},
+ 'log.offset': 1688888863907337472,
+ 'log_id':
'example_bash_operator-run_after_loop-owen_run_run--1-1',
+ 'message': 'Dependencies all met for
dep_context=non-requeueable '
+ 'deps ti=<TaskInstance: '
+ 'example_bash_operator.run_after_loop
owen_run_run '
+ '[queued]>',
+ 'task_id': 'run_after_loop',
+ 'try_number': '1'},
+ '_type': '_doc'}
+ """
+ doc_class = Hit
+ dt = hit.get("_type")
+
+ if "_nested" in hit:
+ doc_class = self._resolve_nested(hit, parent_class)
+
+ elif dt in self._doc_type_map:
+ doc_class = self._doc_type_map[dt]
+
+ else:
+ for doc_type in self._doc_type:
+ if hasattr(doc_type, "_matches") and doc_type._matches(hit):
+ doc_class = doc_type
+ break
+
+ for t in hit.get("inner_hits", ()):
+ hit["inner_hits"][t] = ElasticSearchResponse(self,
hit["inner_hits"][t], doc_class=doc_class)
+
+ # callback should get the Hit class if "from_es" is not defined
+ callback: type[Hit] | Callable[..., Any] = getattr(doc_class,
"from_es", doc_class)
+ return callback(hit)
+
def getattr_nested(obj, item, default):
"""
@@ -462,33 +522,3 @@ def getattr_nested(obj, item, default):
return attrgetter(item)(obj)
except AttributeError:
return default
-
-
-def unwrap_response(res):
- source = res["_source"]
- transformed = {
- "log_id": source.get("log_id"),
- "message": source.get("message"),
- "meta": {
- "id": res.get("_id"),
- "index": res.get("_index"),
- "version": res.get("_version"),
- "headers": res.get("_headers"),
- },
- }
- if "offset" in source:
- transformed["offset"] = source["offset"]
- if "asctime" in source:
- transformed["asctime"] = source["asctime"]
- if "filename" in source:
- transformed["filename"] = source["filename"]
- if "host" in source:
- transformed["host"] = source["host"]
- if "levelname" in source:
- transformed["levelname"] = source["levelname"]
- if "lineno" in source:
- transformed["lineno"] = source["lineno"]
- if "log" in source:
- transformed["log"] = source["log"]
-
- return transformed
diff --git
a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
index c7887b864f..c4e25d290d 100644
--- a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
+++ b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
@@ -76,6 +76,119 @@ class FakeElasticsearch(Elasticsearch):
"tagline": "You Know, for Search",
}
+ @query_params()
+ def sample_log_response(self, headers=None, params=None):
+ return {
+ "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total":
7},
+ "hits": {
+ "hits": [
+ {
+ "_id": "jdeZT4kBjAZqZnexVUxk",
+ "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
+ "_score": 2.482621,
+ "_source": {
+ "@timestamp": "2023-07-13T14:13:15.140Z",
+ "asctime": "2023-07-09T07:47:43.907+0000",
+ "container": {"id": "airflow"},
+ "dag_id": "example_bash_operator",
+ "ecs": {"version": "8.0.0"},
+ "execution_date": "2023_07_09T07_47_32_000000",
+ "filename": "taskinstance.py",
+ "input": {"type": "log"},
+ "levelname": "INFO",
+ "lineno": 1144,
+ "log": {
+ "file": {
+ "path":
"/opt/airflow/Documents/GitHub/airflow/logs/"
+ "dag_id=example_bash_operator'"
+
"/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log"
+ },
+ "offset": 0,
+ },
+ "log.offset": 1688888863907337472,
+ "log_id":
"example_bash_operator-run_after_loop-owen_run_run--1-1",
+ "message": "Dependencies all met for "
+ "dep_context=non-requeueable deps "
+ "ti=<TaskInstance: "
+ "example_bash_operator.run_after_loop "
+ "owen_run_run [queued]>",
+ "task_id": "run_after_loop",
+ "try_number": "1",
+ },
+ "_type": "_doc",
+ },
+ {
+ "_id": "qteZT4kBjAZqZnexVUxl",
+ "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
+ "_score": 2.482621,
+ "_source": {
+ "@timestamp": "2023-07-13T14:13:15.141Z",
+ "asctime": "2023-07-09T07:47:43.917+0000",
+ "container": {"id": "airflow"},
+ "dag_id": "example_bash_operator",
+ "ecs": {"version": "8.0.0"},
+ "execution_date": "2023_07_09T07_47_32_000000",
+ "filename": "taskinstance.py",
+ "input": {"type": "log"},
+ "levelname": "INFO",
+ "lineno": 1347,
+ "log": {
+ "file": {
+ "path":
"/opt/airflow/Documents/GitHub/airflow/logs/"
+ "dag_id=example_bash_operator"
+
"/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log"
+ },
+ "offset": 988,
+ },
+ "log.offset": 1688888863917961216,
+ "log_id":
"example_bash_operator-run_after_loop-owen_run_run--1-1",
+ "message": "Starting attempt 1 of 1",
+ "task_id": "run_after_loop",
+ "try_number": "1",
+ },
+ "_type": "_doc",
+ },
+ {
+ "_id": "v9eZT4kBjAZqZnexVUx2",
+ "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
+ "_score": 2.482621,
+ "_source": {
+ "@timestamp": "2023-07-13T14:13:15.143Z",
+ "asctime": "2023-07-09T07:47:43.928+0000",
+ "container": {"id": "airflow"},
+ "dag_id": "example_bash_operator",
+ "ecs": {"version": "8.0.0"},
+ "execution_date": "2023_07_09T07_47_32_000000",
+ "filename": "taskinstance.py",
+ "input": {"type": "log"},
+ "levelname": "INFO",
+ "lineno": 1368,
+ "log": {
+ "file": {
+ "path":
"/opt/airflow/Documents/GitHub/airflow/logs/"
+ "dag_id=example_bash_operator"
+
"/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log"
+ },
+ "offset": 1372,
+ },
+ "log.offset": 1688888863928218880,
+ "log_id":
"example_bash_operator-run_after_loop-owen_run_run--1-1",
+ "message": "Executing <Task(BashOperator): "
+ "run_after_loop> on 2023-07-09 "
+ "07:47:32+00:00",
+ "task_id": "run_after_loop",
+ "try_number": "1",
+ },
+ "_type": "_doc",
+ },
+ ],
+ "max_score": 2.482621,
+ "total": {"relation": "eq", "value": 36},
+ },
+ "timed_out": False,
+ "took": 7,
+ }
+
@query_params(
"consistency",
"op_type",
@@ -291,7 +404,6 @@ class FakeElasticsearch(Elasticsearch):
"consistency", "parent", "refresh", "replication", "routing",
"timeout", "version", "version_type"
)
def delete(self, index, doc_type, id, params=None, headers=None):
-
found = False
if index in self.__documents_dict:
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py
b/tests/providers/elasticsearch/log/test_es_task_handler.py
index d402cec8aa..93137ff2b2 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -32,6 +32,7 @@ import pytest
from elasticsearch.exceptions import ElasticsearchException
from airflow.configuration import conf
+from airflow.providers.elasticsearch.log.es_response import
ElasticSearchResponse
from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchTaskHandler, getattr_nested
from airflow.utils import timezone
from airflow.utils.state import DagRunState, TaskInstanceState
@@ -103,6 +104,27 @@ class TestElasticsearchTaskHandler:
def teardown_method(self):
shutil.rmtree(self.local_log_location.split(os.path.sep)[0],
ignore_errors=True)
+ def test_es_response(self):
+ sample_response = self.es.sample_log_response()
+ es_response = ElasticSearchResponse(self.es_task_handler,
sample_response)
+ logs_by_host = self.es_task_handler._group_logs_by_host(es_response)
+
+ def concat_logs(lines):
+ log_range = (
+ (len(lines) - 1) if lines[-1].message ==
self.es_task_handler.end_of_log_mark else len(lines)
+ )
+ return "\n".join(self.es_task_handler._format_msg(lines[i]) for i
in range(log_range))
+
+ for _, hosted_log in logs_by_host.items():
+ message = concat_logs(hosted_log)
+
+ assert (
+ message == "Dependencies all met for dep_context=non-requeueable"
+ " deps ti=<TaskInstance: example_bash_operator.run_after_loop
owen_run_run [queued]>\n"
+ "Starting attempt 1 of 1\nExecuting <Task(BashOperator):
run_after_loop> "
+ "on 2023-07-09 07:47:32+00:00"
+ )
+
def test_client(self):
assert isinstance(self.es_task_handler.client,
elasticsearch.Elasticsearch)
assert self.es_task_handler.index_patterns == "_all"