This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 41e611929e Fix Failing ES Remote Logging (#32438)
41e611929e is described below

commit 41e611929eadcbbc746d956647967c82f40fae4a
Author: Owen Leung <[email protected]>
AuthorDate: Sat Jul 15 14:30:04 2023 +0800

    Fix Failing ES Remote Logging (#32438)
    
    * Update es_task_handler.py
---
 airflow/providers/elasticsearch/log/es_response.py | 157 ++++++++++++++++++
 .../providers/elasticsearch/log/es_task_handler.py | 180 ++++++++++++---------
 .../log/elasticmock/fake_elasticsearch.py          | 114 ++++++++++++-
 .../elasticsearch/log/test_es_task_handler.py      |  22 +++
 4 files changed, 397 insertions(+), 76 deletions(-)

diff --git a/airflow/providers/elasticsearch/log/es_response.py 
b/airflow/providers/elasticsearch/log/es_response.py
new file mode 100644
index 0000000000..9a13847a82
--- /dev/null
+++ b/airflow/providers/elasticsearch/log/es_response.py
@@ -0,0 +1,157 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+
+def _wrap(val):
+    if isinstance(val, dict):
+        return AttributeDict(val)
+    return val
+
+
+class AttributeList:
+    """Helper class to provide attribute like access to List objects."""
+
+    def __init__(self, _list):
+        if not isinstance(_list, list):
+            _list = list(_list)
+        self._l_ = _list
+
+    def __getitem__(self, k):
+        val = self._l_[k]
+        if isinstance(val, slice):
+            return AttributeList(val)
+        return _wrap(val)
+
+    def __iter__(self):
+        return map(lambda i: _wrap(i), self._l_)
+
+    def __bool__(self):
+        return bool(self._l_)
+
+
+class AttributeDict:
+    """Helper class to provide attribute like access to Dictionary objects."""
+
+    def __init__(self, d):
+        super().__setattr__("_d_", d)
+
+    def __getattr__(self, attr_name):
+        try:
+            return self.__getitem__(attr_name)
+        except KeyError:
+            raise AttributeError(f"{self.__class__.__name__!r} object has no 
attribute {attr_name!r}")
+
+    def __getitem__(self, key):
+        return _wrap(self._d_[key])
+
+    def to_dict(self):
+        return self._d_
+
+
+class Hit(AttributeDict):
+    """
+    The Hit class is used to manage and access elements in a document.
+    It inherits from the AttributeDict class and provides
+    attribute-like access to its elements, similar to a dictionary.
+    """
+
+    def __init__(self, document):
+        data = {}
+        if "_source" in document:
+            data = document["_source"]
+        if "fields" in document:
+            data.update(document["fields"])
+
+        super().__init__(data)
+        super().__setattr__("meta", HitMeta(document))
+
+
+class HitMeta(AttributeDict):
+    """
+    The HitMeta class is used to manage and access metadata of a document.
+
+    This class inherits from the AttributeDict class and provides
+    attribute-like access to its elements.
+    """
+
+    def __init__(self, document, exclude=("_source", "_fields")):
+        d = {k[1:] if k.startswith("_") else k: v for (k, v) in 
document.items() if k not in exclude}
+        if "type" in d:
+            # make sure we are consistent everywhere in python
+            d["doc_type"] = d.pop("type")
+        super().__init__(d)
+
+
+class ElasticSearchResponse(AttributeDict):
+    """
+    The ElasticSearchResponse class is used to manage and access the response 
from an Elasticsearch search.
+
+    This class can be iterated over directly to access hits in the response. 
Indexing the class instance
+    with an integer or slice will also access the hits. The class also 
evaluates to True
+    if there are any hits in the response.
+
+    The hits property returns an AttributeList of hits in the response, with 
each hit transformed into
+    an instance of the doc_class if provided.
+
+    The response parameter stores the dictionary returned by the Elasticsearch 
client search method.
+    """
+
+    def __init__(self, search, response, doc_class=None):
+        super().__setattr__("_search", search)
+        super().__setattr__("_doc_class", doc_class)
+        super().__init__(response)
+
+    def __iter__(self):
+        return iter(self.hits)
+
+    def __getitem__(self, key):
+        if isinstance(key, (slice, int)):
+            return self.hits[key]
+        return super().__getitem__(key)
+
+    def __bool__(self):
+        return bool(self.hits)
+
+    @property
+    def hits(self):
+        """
+        This property provides access to the hits (i.e., the results) of the 
Elasticsearch response.
+
+        The hits are represented as an `AttributeList` of `Hit` instances, 
which allow for easy,
+        attribute-like access to the hit data.
+
+        The hits are lazily loaded, meaning they're not processed until this 
property is accessed.
+        Upon first access, the hits data from the response is processed using 
the `_get_result` method
+        of the associated `Search` instance (i.e. an instance from 
ElasticsearchTaskHandler class),
+        and the results are stored for future accesses.
+
+        Each hit also includes all the additional data present in the "hits" 
field of the response,
+        accessible as attributes of the hit.
+        """
+        if not hasattr(self, "_hits"):
+            h = self._d_["hits"]
+
+            try:
+                hits = AttributeList(map(self._search._get_result, h["hits"]))
+            except AttributeError as e:
+                raise TypeError("Could not parse hits.", e)
+
+            super().__setattr__("_hits", hits)
+            for k in h:
+                setattr(self._hits, k, _wrap(h[k]))
+        return self._hits
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py 
b/airflow/providers/elasticsearch/log/es_task_handler.py
index 1bc071d2c2..2a780b622e 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -24,7 +24,7 @@ from collections import defaultdict
 from datetime import datetime
 from operator import attrgetter
 from time import time
-from typing import TYPE_CHECKING, List, Tuple
+from typing import TYPE_CHECKING, Any, Callable, List, Tuple
 from urllib.parse import quote
 
 # Using `from elasticsearch import *` would break elasticsearch mocking used 
in unit test.
@@ -37,6 +37,7 @@ from airflow.exceptions import 
AirflowProviderDeprecationWarning
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
 from airflow.providers.elasticsearch.log.es_json_formatter import 
ElasticsearchJSONFormatter
+from airflow.providers.elasticsearch.log.es_response import 
ElasticSearchResponse, Hit
 from airflow.utils import timezone
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
@@ -52,34 +53,6 @@ EsLogMsgType = List[Tuple[str, str]]
 USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
 
 
-class Log:
-    """wrapper class to mimic the attributes in Search class used in 
elasticsearch_dsl.Search."""
-
-    def __init__(self, offset):
-        self.offset = offset
-
-
-class ElasticSearchResponse:
-    """wrapper class to mimic the Search class used in 
elasticsearch_dsl.Search."""
-
-    def __init__(self, **kwargs):
-        # Store all provided keyword arguments as attributes of this object
-        for key, value in kwargs.items():
-            if key == "log":
-                setattr(self, key, Log(**value))
-            else:
-                setattr(self, key, value)
-
-    def to_dict(self):
-        result = {}
-        for key in self.__dict__.keys():
-            if key == "log":
-                result[key] = self.__dict__[key].__dict__
-            else:
-                result[key] = self.__dict__[key]
-        return result
-
-
 class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, 
LoggingMixin):
     """
     ElasticsearchTaskHandler is a python log handler that reads logs from 
Elasticsearch.
@@ -150,6 +123,8 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
 
         self.formatter: logging.Formatter
         self.handler: logging.FileHandler | logging.StreamHandler  # type: 
ignore[assignment]
+        self._doc_type_map: dict[Any, Any] = {}
+        self._doc_type: list[Any] = []
 
     def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
         with create_session() as session:
@@ -299,7 +274,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
         # Just a safe-guard to preserve backwards-compatibility
         return log_line.message
 
-    def es_read(self, log_id: str, offset: str, metadata: dict) -> list:
+    def es_read(self, log_id: str, offset: str, metadata: dict) -> list | 
ElasticSearchResponse:
         """
         Return the logs matching log_id in Elasticsearch and next offset or ''.
 
@@ -307,17 +282,13 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
         :param offset: the offset start to read log from.
         :param metadata: log metadata, used for steaming log download.
         """
-        # Offset is the unique key for sorting logs given log_id.
-        query = {
+        query: dict[Any, Any] = {
             "query": {
                 "bool": {
-                    "must": [
-                        {"match_phrase": {"log_id": log_id}},
-                        {"range": {self.offset_field: {"gt": int(offset)}}},
-                    ]
+                    "filter": [{"range": {self.offset_field: {"gt": 
int(offset)}}}],
+                    "must": [{"match_phrase": {"log_id": log_id}}],
                 }
-            },
-            "sort": [{self.offset_field: {"order": "asc"}}],
+            }
         }
 
         try:
@@ -329,21 +300,17 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
             self.log.exception("Could not get current log size with log_id: 
%s", log_id)
             raise e
 
-        logs = []
+        logs: list[Any] | ElasticSearchResponse = []
         if max_log_line != 0:
             try:
+                query.update({"sort": [self.offset_field]})
                 res = self.client.search(
                     index=self.index_patterns,
                     body=query,
                     size=self.MAX_LINE_PER_PAGE,
                     from_=self.MAX_LINE_PER_PAGE * self.PAGE,
                 )
-                logs = [
-                    ElasticSearchResponse(
-                        **unwrap_response(response),
-                    )
-                    for response in res["hits"]["hits"]
-                ]
+                logs = ElasticSearchResponse(self, res)
             except elasticsearch.exceptions.ElasticsearchException:
                 self.log.exception("Could not read log with log_id: %s", 
log_id)
 
@@ -448,6 +415,99 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
         """Whether we can support external links."""
         return bool(self.frontend)
 
+    def _resolve_nested(self, hit: dict[Any, Any], parent_class=None) -> 
type[Hit]:
+        """
+        Resolves nested hits from Elasticsearch by iteratively navigating the 
`_nested` field.
+        The result is used to fetch the appropriate document class to handle 
the hit.
+
+        This method can be used with nested Elasticsearch fields which are 
structured
+        as dictionaries with "field" and "_nested" keys.
+        """
+        doc_class = Hit
+
+        nested_path: list[str] = []
+        nesting = hit["_nested"]
+        while nesting and "field" in nesting:
+            nested_path.append(nesting["field"])
+            nesting = nesting.get("_nested")
+        nested_path_str = ".".join(nested_path)
+
+        if hasattr(parent_class, "_index"):
+            nested_field = parent_class._index.resolve_field(nested_path_str)
+
+        if nested_field is not None:
+            return nested_field._doc_class
+
+        return doc_class
+
+    def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit:
+        """
+        This method processes a hit (i.e., a result) from an Elasticsearch 
response and transforms it into an
+        appropriate class instance.
+
+        The transformation depends on the contents of the hit. If the document 
in hit contains a nested field,
+        the '_resolve_nested' method is used to determine the appropriate 
class (based on the nested path).
+        If the hit has a document type that is present in the '_doc_type_map', 
the corresponding class is
+        used. If not, the method iterates over the '_doc_type' classes and 
uses the first one whose '_matches'
+        method returns True for the hit.
+
+        If the hit contains any 'inner_hits', these are also processed into 
'ElasticSearchResponse' instances
+        using the determined class.
+
+        Finally, the transformed hit is returned. If the determined class has 
a 'from_es' method, this is
+        used to transform the hit
+
+        An example of the hit argument:
+
+        {'_id': 'jdeZT4kBjAZqZnexVUxk',
+         '_index': '.ds-filebeat-8.8.2-2023.07.09-000001',
+         '_score': 2.482621,
+         '_source': {'@timestamp': '2023-07-13T14:13:15.140Z',
+                     'asctime': '2023-07-09T07:47:43.907+0000',
+                     'container': {'id': 'airflow'},
+                     'dag_id': 'example_bash_operator',
+                     'ecs': {'version': '8.0.0'},
+                     'execution_date': '2023_07_09T07_47_32_000000',
+                     'filename': 'taskinstance.py',
+                     'input': {'type': 'log'},
+                     'levelname': 'INFO',
+                     'lineno': 1144,
+                     'log': {'file': {'path': 
"/opt/airflow/Documents/GitHub/airflow/logs/
+                     dag_id=example_bash_operator'/run_id=owen_run_run/
+                     task_id=run_after_loop/attempt=1.log"},
+                             'offset': 0},
+                     'log.offset': 1688888863907337472,
+                     'log_id': 
'example_bash_operator-run_after_loop-owen_run_run--1-1',
+                     'message': 'Dependencies all met for 
dep_context=non-requeueable '
+                                'deps ti=<TaskInstance: '
+                                'example_bash_operator.run_after_loop 
owen_run_run '
+                                '[queued]>',
+                     'task_id': 'run_after_loop',
+                     'try_number': '1'},
+         '_type': '_doc'}
+        """
+        doc_class = Hit
+        dt = hit.get("_type")
+
+        if "_nested" in hit:
+            doc_class = self._resolve_nested(hit, parent_class)
+
+        elif dt in self._doc_type_map:
+            doc_class = self._doc_type_map[dt]
+
+        else:
+            for doc_type in self._doc_type:
+                if hasattr(doc_type, "_matches") and doc_type._matches(hit):
+                    doc_class = doc_type
+                    break
+
+        for t in hit.get("inner_hits", ()):
+            hit["inner_hits"][t] = ElasticSearchResponse(self, 
hit["inner_hits"][t], doc_class=doc_class)
+
+        # callback should get the Hit class if "from_es" is not defined
+        callback: type[Hit] | Callable[..., Any] = getattr(doc_class, 
"from_es", doc_class)
+        return callback(hit)
+
 
 def getattr_nested(obj, item, default):
     """
@@ -462,33 +522,3 @@ def getattr_nested(obj, item, default):
         return attrgetter(item)(obj)
     except AttributeError:
         return default
-
-
-def unwrap_response(res):
-    source = res["_source"]
-    transformed = {
-        "log_id": source.get("log_id"),
-        "message": source.get("message"),
-        "meta": {
-            "id": res.get("_id"),
-            "index": res.get("_index"),
-            "version": res.get("_version"),
-            "headers": res.get("_headers"),
-        },
-    }
-    if "offset" in source:
-        transformed["offset"] = source["offset"]
-    if "asctime" in source:
-        transformed["asctime"] = source["asctime"]
-    if "filename" in source:
-        transformed["filename"] = source["filename"]
-    if "host" in source:
-        transformed["host"] = source["host"]
-    if "levelname" in source:
-        transformed["levelname"] = source["levelname"]
-    if "lineno" in source:
-        transformed["lineno"] = source["lineno"]
-    if "log" in source:
-        transformed["log"] = source["log"]
-
-    return transformed
diff --git 
a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py 
b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
index c7887b864f..c4e25d290d 100644
--- a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
+++ b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
@@ -76,6 +76,119 @@ class FakeElasticsearch(Elasticsearch):
             "tagline": "You Know, for Search",
         }
 
+    @query_params()
+    def sample_log_response(self, headers=None, params=None):
+        return {
+            "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total": 
7},
+            "hits": {
+                "hits": [
+                    {
+                        "_id": "jdeZT4kBjAZqZnexVUxk",
+                        "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
+                        "_score": 2.482621,
+                        "_source": {
+                            "@timestamp": "2023-07-13T14:13:15.140Z",
+                            "asctime": "2023-07-09T07:47:43.907+0000",
+                            "container": {"id": "airflow"},
+                            "dag_id": "example_bash_operator",
+                            "ecs": {"version": "8.0.0"},
+                            "execution_date": "2023_07_09T07_47_32_000000",
+                            "filename": "taskinstance.py",
+                            "input": {"type": "log"},
+                            "levelname": "INFO",
+                            "lineno": 1144,
+                            "log": {
+                                "file": {
+                                    "path": 
"/opt/airflow/Documents/GitHub/airflow/logs/"
+                                    "dag_id=example_bash_operator'"
+                                    
"/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log"
+                                },
+                                "offset": 0,
+                            },
+                            "log.offset": 1688888863907337472,
+                            "log_id": 
"example_bash_operator-run_after_loop-owen_run_run--1-1",
+                            "message": "Dependencies all met for "
+                            "dep_context=non-requeueable deps "
+                            "ti=<TaskInstance: "
+                            "example_bash_operator.run_after_loop "
+                            "owen_run_run [queued]>",
+                            "task_id": "run_after_loop",
+                            "try_number": "1",
+                        },
+                        "_type": "_doc",
+                    },
+                    {
+                        "_id": "qteZT4kBjAZqZnexVUxl",
+                        "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
+                        "_score": 2.482621,
+                        "_source": {
+                            "@timestamp": "2023-07-13T14:13:15.141Z",
+                            "asctime": "2023-07-09T07:47:43.917+0000",
+                            "container": {"id": "airflow"},
+                            "dag_id": "example_bash_operator",
+                            "ecs": {"version": "8.0.0"},
+                            "execution_date": "2023_07_09T07_47_32_000000",
+                            "filename": "taskinstance.py",
+                            "input": {"type": "log"},
+                            "levelname": "INFO",
+                            "lineno": 1347,
+                            "log": {
+                                "file": {
+                                    "path": 
"/opt/airflow/Documents/GitHub/airflow/logs/"
+                                    "dag_id=example_bash_operator"
+                                    
"/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log"
+                                },
+                                "offset": 988,
+                            },
+                            "log.offset": 1688888863917961216,
+                            "log_id": 
"example_bash_operator-run_after_loop-owen_run_run--1-1",
+                            "message": "Starting attempt 1 of 1",
+                            "task_id": "run_after_loop",
+                            "try_number": "1",
+                        },
+                        "_type": "_doc",
+                    },
+                    {
+                        "_id": "v9eZT4kBjAZqZnexVUx2",
+                        "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
+                        "_score": 2.482621,
+                        "_source": {
+                            "@timestamp": "2023-07-13T14:13:15.143Z",
+                            "asctime": "2023-07-09T07:47:43.928+0000",
+                            "container": {"id": "airflow"},
+                            "dag_id": "example_bash_operator",
+                            "ecs": {"version": "8.0.0"},
+                            "execution_date": "2023_07_09T07_47_32_000000",
+                            "filename": "taskinstance.py",
+                            "input": {"type": "log"},
+                            "levelname": "INFO",
+                            "lineno": 1368,
+                            "log": {
+                                "file": {
+                                    "path": 
"/opt/airflow/Documents/GitHub/airflow/logs/"
+                                    "dag_id=example_bash_operator"
+                                    
"/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log"
+                                },
+                                "offset": 1372,
+                            },
+                            "log.offset": 1688888863928218880,
+                            "log_id": 
"example_bash_operator-run_after_loop-owen_run_run--1-1",
+                            "message": "Executing <Task(BashOperator): "
+                            "run_after_loop> on 2023-07-09 "
+                            "07:47:32+00:00",
+                            "task_id": "run_after_loop",
+                            "try_number": "1",
+                        },
+                        "_type": "_doc",
+                    },
+                ],
+                "max_score": 2.482621,
+                "total": {"relation": "eq", "value": 36},
+            },
+            "timed_out": False,
+            "took": 7,
+        }
+
     @query_params(
         "consistency",
         "op_type",
@@ -291,7 +404,6 @@ class FakeElasticsearch(Elasticsearch):
         "consistency", "parent", "refresh", "replication", "routing", 
"timeout", "version", "version_type"
     )
     def delete(self, index, doc_type, id, params=None, headers=None):
-
         found = False
 
         if index in self.__documents_dict:
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py 
b/tests/providers/elasticsearch/log/test_es_task_handler.py
index d402cec8aa..93137ff2b2 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -32,6 +32,7 @@ import pytest
 from elasticsearch.exceptions import ElasticsearchException
 
 from airflow.configuration import conf
+from airflow.providers.elasticsearch.log.es_response import 
ElasticSearchResponse
 from airflow.providers.elasticsearch.log.es_task_handler import 
ElasticsearchTaskHandler, getattr_nested
 from airflow.utils import timezone
 from airflow.utils.state import DagRunState, TaskInstanceState
@@ -103,6 +104,27 @@ class TestElasticsearchTaskHandler:
     def teardown_method(self):
         shutil.rmtree(self.local_log_location.split(os.path.sep)[0], 
ignore_errors=True)
 
+    def test_es_response(self):
+        sample_response = self.es.sample_log_response()
+        es_response = ElasticSearchResponse(self.es_task_handler, 
sample_response)
+        logs_by_host = self.es_task_handler._group_logs_by_host(es_response)
+
+        def concat_logs(lines):
+            log_range = (
+                (len(lines) - 1) if lines[-1].message == 
self.es_task_handler.end_of_log_mark else len(lines)
+            )
+            return "\n".join(self.es_task_handler._format_msg(lines[i]) for i 
in range(log_range))
+
+        for _, hosted_log in logs_by_host.items():
+            message = concat_logs(hosted_log)
+
+        assert (
+            message == "Dependencies all met for dep_context=non-requeueable"
+            " deps ti=<TaskInstance: example_bash_operator.run_after_loop 
owen_run_run [queued]>\n"
+            "Starting attempt 1 of 1\nExecuting <Task(BashOperator): 
run_after_loop> "
+            "on 2023-07-09 07:47:32+00:00"
+        )
+
     def test_client(self):
         assert isinstance(self.es_task_handler.client, 
elasticsearch.Elasticsearch)
         assert self.es_task_handler.index_patterns == "_all"

Reply via email to