This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 95d1a382a13 tests: refactor unit test of elasticsearch (#64200)
95d1a382a13 is described below

commit 95d1a382a13c68caae5129da14556999ad69c3a2
Author: Owen Leung <[email protected]>
AuthorDate: Wed Mar 25 15:43:45 2026 +0800

    tests: refactor unit test of elasticsearch (#64200)
---
 .../unit/elasticsearch/log/elasticmock/__init__.py |  111 --
 .../log/elasticmock/fake_elasticsearch.py          |  630 ----------
 .../log/elasticmock/utilities/__init__.py          |  232 ----
 .../unit/elasticsearch/log/test_es_task_handler.py | 1225 ++++++--------------
 4 files changed, 367 insertions(+), 1831 deletions(-)

diff --git 
a/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/__init__.py 
b/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/__init__.py
deleted file mode 100644
index 40072c63721..00000000000
--- 
a/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/__init__.py
+++ /dev/null
@@ -1,111 +0,0 @@
-"""Elastic mock module used for testing"""
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-#
-# The MIT License (MIT)
-#
-# Copyright (c) 2016 Marcos Cardoso
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in 
all
-# copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-# SOFTWARE.
-from functools import wraps
-from unittest.mock import patch
-from urllib.parse import unquote, urlparse
-
-from unit.elasticsearch.log.elasticmock.fake_elasticsearch import 
FakeElasticsearch
-
-ELASTIC_INSTANCES: dict[str, FakeElasticsearch] = {}
-
-
-def _normalize_hosts(hosts):
-    """
-    Helper function to transform hosts argument to
-    :class:`~elasticsearch.Elasticsearch` to a list of dicts.
-    """
-    # if hosts are empty, just defer to defaults down the line
-    if hosts is None:
-        return [{}]
-
-    hosts = [hosts]
-
-    out = []
-
-    for host_raw in hosts:
-        host = f"//{host_raw}" if "://" not in host_raw else host_raw
-
-        parsed_url = urlparse(host)
-        h = {"host": parsed_url.hostname}
-
-        if parsed_url.port:
-            h["port"] = parsed_url.port
-
-        if parsed_url.scheme == "https":
-            h["port"] = parsed_url.port or 443
-            h["use_ssl"] = True
-
-        if parsed_url.username or parsed_url.password:
-            h["http_auth"] = 
f"{unquote(parsed_url.username)}:{unquote(parsed_url.password)}"
-
-        if parsed_url.path and parsed_url.path != "/":
-            h["url_prefix"] = parsed_url.path
-
-        out.append(h)
-    out.append(host)
-    return out
-
-
-def _get_elasticmock(hosts=None, *args, **kwargs):
-    host = _normalize_hosts(hosts)[0]
-    elastic_key = f"http://{host.get('host', 'localhost')}:{host.get('port', 
9200)}"
-
-    if elastic_key in ELASTIC_INSTANCES:
-        connection = ELASTIC_INSTANCES.get(elastic_key)
-    else:
-        connection = FakeElasticsearch()
-        ELASTIC_INSTANCES[elastic_key] = connection
-    return connection
-
-
-def elasticmock(function):
-    """Elasticmock decorator"""
-
-    @wraps(function)
-    def decorated(*args, **kwargs):
-        ELASTIC_INSTANCES.clear()
-        with patch("elasticsearch.Elasticsearch", _get_elasticmock):
-            result = function(*args, **kwargs)
-        return result
-
-    return decorated
diff --git 
a/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/fake_elasticsearch.py
 
b/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/fake_elasticsearch.py
deleted file mode 100644
index 4b9d81c0210..00000000000
--- 
a/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/fake_elasticsearch.py
+++ /dev/null
@@ -1,630 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import fnmatch
-import json
-
-from elasticsearch import Elasticsearch
-from elasticsearch.exceptions import NotFoundError
-
-from unit.elasticsearch.log.elasticmock.utilities import (
-    MissingIndexException,
-    get_random_id,
-    query_params,
-)
-
-#
-# The MIT License (MIT)
-#
-# Copyright (c) 2016 Marcos Cardoso
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in 
all
-# copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-# SOFTWARE.
-
-
-class FakeElasticsearch(Elasticsearch):
-    __documents_dict = None
-
-    def __init__(self):
-        super().__init__("http://localhost:9200";)
-        self.__documents_dict = {}
-
-    @query_params()
-    def ping(self, params=None):
-        return True
-
-    @query_params()
-    def info(self, params=None):
-        return {
-            "status": 200,
-            "cluster_name": "elasticmock",
-            "version": {
-                "lucene_version": "4.10.4",
-                "build_hash": "00f95f4ffca6de89d68b7ccaf80d148f1f70e4d4",
-                "number": "1.7.5",
-                "build_timestamp": "2016-02-02T09:55:30Z",
-                "build_snapshot": False,
-            },
-            "name": "Nightwatch",
-            "tagline": "You Know, for Search",
-        }
-
-    @query_params()
-    def sample_airflow_2_log_response(self, headers=None, params=None):
-        return {
-            "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total": 
7},
-            "hits": {
-                "hits": [
-                    {
-                        "_id": "jdeZT4kBjAZqZnexVUxk",
-                        "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
-                        "_score": 2.482621,
-                        "_source": {
-                            "@timestamp": "2023-07-13T14:13:15.140Z",
-                            "asctime": "2023-07-09T07:47:43.907+0000",
-                            "container": {"id": "airflow"},
-                            "dag_id": "example_bash_operator",
-                            "ecs": {"version": "8.0.0"},
-                            "execution_date": "2023_07_09T07_47_32_000000",
-                            "filename": "taskinstance.py",
-                            "input": {"type": "log"},
-                            "levelname": "INFO",
-                            "lineno": 1144,
-                            "log": {
-                                "file": {
-                                    "path": 
"/opt/airflow/Documents/GitHub/airflow/logs/"
-                                    "dag_id=example_bash_operator'"
-                                    
"/run_id=run_run/task_id=run_after_loop/attempt=1.log"
-                                },
-                                "offset": 0,
-                            },
-                            "log.offset": 1688888863907337472,
-                            "log_id": 
"example_bash_operator-run_after_loop-run_run--1-1",
-                            "message": "Dependencies all met for "
-                            "dep_context=non-requeueable deps "
-                            "ti=<TaskInstance: "
-                            "example_bash_operator.run_after_loop ",
-                            "task_id": "run_after_loop",
-                            "try_number": "1",
-                        },
-                        "_type": "_doc",
-                    },
-                    {
-                        "_id": "qteZT4kBjAZqZnexVUxl",
-                        "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
-                        "_score": 2.482621,
-                        "_source": {
-                            "@timestamp": "2023-07-13T14:13:15.141Z",
-                            "asctime": "2023-07-09T07:47:43.917+0000",
-                            "container": {"id": "airflow"},
-                            "dag_id": "example_bash_operator",
-                            "ecs": {"version": "8.0.0"},
-                            "execution_date": "2023_07_09T07_47_32_000000",
-                            "filename": "taskinstance.py",
-                            "input": {"type": "log"},
-                            "levelname": "INFO",
-                            "lineno": 1347,
-                            "log": {
-                                "file": {
-                                    "path": 
"/opt/airflow/Documents/GitHub/airflow/logs/"
-                                    "dag_id=example_bash_operator"
-                                    
"/run_id=run_run/task_id=run_after_loop/attempt=1.log"
-                                },
-                                "offset": 988,
-                            },
-                            "log.offset": 1688888863917961216,
-                            "log_id": 
"example_bash_operator-run_after_loop-run_run--1-1",
-                            "message": "Starting attempt 1 of 1",
-                            "task_id": "run_after_loop",
-                            "try_number": "1",
-                        },
-                        "_type": "_doc",
-                    },
-                    {
-                        "_id": "v9eZT4kBjAZqZnexVUx2",
-                        "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
-                        "_score": 2.482621,
-                        "_source": {
-                            "@timestamp": "2023-07-13T14:13:15.143Z",
-                            "asctime": "2023-07-09T07:47:43.928+0000",
-                            "container": {"id": "airflow"},
-                            "dag_id": "example_bash_operator",
-                            "ecs": {"version": "8.0.0"},
-                            "execution_date": "2023_07_09T07_47_32_000000",
-                            "filename": "taskinstance.py",
-                            "input": {"type": "log"},
-                            "levelname": "INFO",
-                            "lineno": 1368,
-                            "log": {
-                                "file": {
-                                    "path": 
"/opt/airflow/Documents/GitHub/airflow/logs/"
-                                    "dag_id=example_bash_operator"
-                                    
"/run_id=run_run/task_id=run_after_loop/attempt=1.log"
-                                },
-                                "offset": 1372,
-                            },
-                            "log.offset": 1688888863928218880,
-                            "log_id": 
"example_bash_operator-run_after_loop-run_run--1-1",
-                            "message": "Executing <Task(BashOperator): "
-                            "run_after_loop> on 2023-07-09 "
-                            "07:47:32+00:00",
-                            "task_id": "run_after_loop",
-                            "try_number": "1",
-                        },
-                        "_type": "_doc",
-                    },
-                ],
-                "max_score": 2.482621,
-                "total": {"relation": "eq", "value": 36},
-            },
-            "timed_out": False,
-            "took": 7,
-        }
-
-    @query_params()
-    def sample_airflow_3_log_response(self, headers=None, params=None):
-        return {
-            "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total": 
7},
-            "hits": {
-                "hits": [
-                    {
-                        "_id": "jdeZT4kBjAZqZnexVUxk",
-                        "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
-                        "_score": 2.482621,
-                        "_source": {
-                            "@timestamp": "2023-07-13T14:13:15.140Z",
-                            "asctime": "2023-07-09T07:47:43.907+0000",
-                            "container": {"id": "airflow"},
-                            "dag_id": "example_bash_operator",
-                            "ecs": {"version": "8.0.0"},
-                            "execution_date": "2023_07_09T07_47_32_000000",
-                            "filename": "taskinstance.py",
-                            "input": {"type": "log"},
-                            "levelname": "INFO",
-                            "lineno": 1144,
-                            "log": {
-                                "file": {
-                                    "path": 
"/opt/airflow/Documents/GitHub/airflow/logs/"
-                                    "dag_id=example_bash_operator'"
-                                    
"/run_id=run_run/task_id=run_after_loop/attempt=1.log"
-                                },
-                                "offset": 0,
-                            },
-                            "log.offset": 1688888863907337472,
-                            "log_id": 
"example_bash_operator-run_after_loop-run_run--1-1",
-                            "task_id": "run_after_loop",
-                            "try_number": "1",
-                            "event": "Dependencies all met for "
-                            "dep_context=non-requeueable deps "
-                            "ti=<TaskInstance: "
-                            "example_bash_operator.run_after_loop ",
-                        },
-                        "_type": "_doc",
-                    },
-                    {
-                        "_id": "qteZT4kBjAZqZnexVUxl",
-                        "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
-                        "_score": 2.482621,
-                        "_source": {
-                            "@timestamp": "2023-07-13T14:13:15.141Z",
-                            "asctime": "2023-07-09T07:47:43.917+0000",
-                            "container": {"id": "airflow"},
-                            "dag_id": "example_bash_operator",
-                            "ecs": {"version": "8.0.0"},
-                            "execution_date": "2023_07_09T07_47_32_000000",
-                            "filename": "taskinstance.py",
-                            "input": {"type": "log"},
-                            "levelname": "INFO",
-                            "lineno": 1347,
-                            "log": {
-                                "file": {
-                                    "path": 
"/opt/airflow/Documents/GitHub/airflow/logs/"
-                                    "dag_id=example_bash_operator"
-                                    
"/run_id=run_run/task_id=run_after_loop/attempt=1.log"
-                                },
-                                "offset": 988,
-                            },
-                            "log.offset": 1688888863917961216,
-                            "log_id": 
"example_bash_operator-run_after_loop-run_run--1-1",
-                            "event": "Starting attempt 1 of 1",
-                            "task_id": "run_after_loop",
-                            "try_number": "1",
-                        },
-                        "_type": "_doc",
-                    },
-                    {
-                        "_id": "v9eZT4kBjAZqZnexVUx2",
-                        "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
-                        "_score": 2.482621,
-                        "_source": {
-                            "@timestamp": "2023-07-13T14:13:15.143Z",
-                            "asctime": "2023-07-09T07:47:43.928+0000",
-                            "container": {"id": "airflow"},
-                            "dag_id": "example_bash_operator",
-                            "ecs": {"version": "8.0.0"},
-                            "execution_date": "2023_07_09T07_47_32_000000",
-                            "filename": "taskinstance.py",
-                            "input": {"type": "log"},
-                            "levelname": "INFO",
-                            "lineno": 1368,
-                            "log": {
-                                "file": {
-                                    "path": 
"/opt/airflow/Documents/GitHub/airflow/logs/"
-                                    "dag_id=example_bash_operator"
-                                    
"/run_id=run_run/task_id=run_after_loop/attempt=1.log"
-                                },
-                                "offset": 1372,
-                            },
-                            "log.offset": 1688888863928218880,
-                            "log_id": 
"example_bash_operator-run_after_loop-run_run--1-1",
-                            "task_id": "run_after_loop",
-                            "try_number": "1",
-                            "event": "Executing <Task(BashOperator): "
-                            "run_after_loop> on 2023-07-09 "
-                            "07:47:32+00:00",
-                        },
-                        "_type": "_doc",
-                    },
-                ],
-                "max_score": 2.482621,
-                "total": {"relation": "eq", "value": 36},
-            },
-            "timed_out": False,
-            "took": 7,
-        }
-
-    @query_params(
-        "consistency",
-        "op_type",
-        "parent",
-        "refresh",
-        "replication",
-        "routing",
-        "timeout",
-        "timestamp",
-        "ttl",
-        "version",
-        "version_type",
-    )
-    def index(self, index, document=None, doc_type=None, body=None, id=None, 
params=None, headers=None):
-        if document is None:
-            document = body
-        if index not in self.__documents_dict:
-            self.__documents_dict[index] = []
-
-        if id is None:
-            id = get_random_id()
-
-        version = 1
-
-        self.__documents_dict[index].append(
-            {
-                "_type": doc_type,
-                "_id": id,
-                "_source": document,
-                "_index": index,
-                "_version": version,
-                "_headers": headers,
-            }
-        )
-
-        return {
-            "_type": doc_type,
-            "_id": id,
-            "created": True,
-            "_version": version,
-            "_index": index,
-            "_headers": headers,
-        }
-
-    @query_params("parent", "preference", "realtime", "refresh", "routing")
-    def exists(self, index, doc_type, id, params=None):
-        result = False
-        if index in self.__documents_dict:
-            for document in self.__documents_dict[index]:
-                if document.get("_id") == id and document.get("_type") == 
doc_type:
-                    result = True
-                    break
-        return result
-
-    @query_params(
-        "_source",
-        "_source_exclude",
-        "_source_include",
-        "fields",
-        "parent",
-        "preference",
-        "realtime",
-        "refresh",
-        "routing",
-        "version",
-        "version_type",
-    )
-    def get(self, index, id, doc_type="_all", params=None):
-        result = None
-        if index in self.__documents_dict:
-            result = self.find_document(doc_type, id, index, result)
-
-        if result:
-            result["found"] = True
-        else:
-            error_data = {"_index": index, "_type": doc_type, "_id": id, 
"found": False}
-            raise NotFoundError(404, json.dumps(error_data))
-
-        return result
-
-    def find_document(self, doc_type, id, index, result):
-        for document in self.__documents_dict[index]:
-            if document.get("_id") == id:
-                if doc_type == "_all" or document.get("_type") == doc_type:
-                    result = document
-                    break
-        return result
-
-    @query_params(
-        "_source",
-        "_source_exclude",
-        "_source_include",
-        "parent",
-        "preference",
-        "realtime",
-        "refresh",
-        "routing",
-        "version",
-        "version_type",
-    )
-    def get_source(self, index, doc_type, id, params=None):
-        document = self.get(index=index, doc_type=doc_type, id=id, 
params=params)
-        return document.get("_source")
-
-    @query_params(
-        "_source",
-        "_source_exclude",
-        "_source_include",
-        "allow_no_indices",
-        "analyze_wildcard",
-        "analyzer",
-        "default_operator",
-        "df",
-        "expand_wildcards",
-        "explain",
-        "fielddata_fields",
-        "fields",
-        "from_",
-        "ignore_unavailable",
-        "lenient",
-        "lowercase_expanded_terms",
-        "preference",
-        "q",
-        "request_cache",
-        "routing",
-        "scroll",
-        "search_type",
-        "size",
-        "sort",
-        "stats",
-        "suggest_field",
-        "suggest_mode",
-        "suggest_size",
-        "suggest_text",
-        "terminate_after",
-        "timeout",
-        "track_scores",
-        "version",
-    )
-    def count(self, index=None, doc_type=None, query=None, params=None, 
headers=None):
-        searchable_indexes = self._normalize_index_to_list(index, query=query)
-        searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
-        i = 0
-        for searchable_index in searchable_indexes:
-            for document in self.__documents_dict[searchable_index]:
-                if not searchable_doc_types or document.get("_type") in 
searchable_doc_types:
-                    i += 1
-        result = {"count": i, "_shards": {"successful": 1, "failed": 0, 
"total": 1}}
-
-        return result
-
-    @query_params(
-        "_source",
-        "_source_exclude",
-        "_source_include",
-        "allow_no_indices",
-        "analyze_wildcard",
-        "analyzer",
-        "default_operator",
-        "df",
-        "expand_wildcards",
-        "explain",
-        "fielddata_fields",
-        "fields",
-        "from_",
-        "ignore_unavailable",
-        "lenient",
-        "lowercase_expanded_terms",
-        "preference",
-        "q",
-        "request_cache",
-        "routing",
-        "scroll",
-        "search_type",
-        "size",
-        "sort",
-        "stats",
-        "suggest_field",
-        "suggest_mode",
-        "suggest_size",
-        "suggest_text",
-        "terminate_after",
-        "timeout",
-        "track_scores",
-        "version",
-    )
-    def search(self, index=None, doc_type=None, query=None, params=None, 
headers=None):
-        searchable_indexes = self._normalize_index_to_list(index, query=query)
-
-        matches = self._find_match(index, doc_type, query=query)
-
-        result = {
-            "hits": {"total": len(matches), "max_score": 1.0},
-            "_shards": {
-                # Simulate indexes with 1 shard each
-                "successful": len(searchable_indexes),
-                "failed": 0,
-                "total": len(searchable_indexes),
-            },
-            "took": 1,
-            "timed_out": False,
-        }
-
-        hits = []
-        for match in matches:
-            match["_score"] = 1.0
-            hits.append(match)
-        result["hits"]["hits"] = hits
-
-        return result
-
-    @query_params(
-        "consistency", "parent", "refresh", "replication", "routing", 
"timeout", "version", "version_type"
-    )
-    def delete(self, index, doc_type, id, params=None, headers=None):
-        found = False
-
-        if index in self.__documents_dict:
-            for document in self.__documents_dict[index]:
-                if document.get("_type") == doc_type and document.get("_id") 
== id:
-                    found = True
-                    self.__documents_dict[index].remove(document)
-                    break
-
-        result_dict = {
-            "found": found,
-            "_index": index,
-            "_type": doc_type,
-            "_id": id,
-            "_version": 1,
-        }
-
-        if found:
-            return result_dict
-        raise NotFoundError(404, json.dumps(result_dict))
-
-    @query_params("allow_no_indices", "expand_wildcards", 
"ignore_unavailable", "preference", "routing")
-    def suggest(self, body, index=None):
-        if index is not None and index not in self.__documents_dict:
-            raise NotFoundError(404, f"IndexMissingException[[{index}] 
missing]")
-
-        result_dict = {}
-        for key, value in body.items():
-            text = value.get("text")
-            suggestion = int(text) + 1 if isinstance(text, int) else 
f"{text}_suggestion"
-            result_dict[key] = [
-                {
-                    "text": text,
-                    "length": 1,
-                    "options": [{"text": suggestion, "freq": 1, "score": 1.0}],
-                    "offset": 0,
-                }
-            ]
-        return result_dict
-
-    def _find_match(self, index, doc_type, query):
-        searchable_indexes = self._normalize_index_to_list(index, query=query)
-        searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
-
-        must = query["bool"]["must"][0]  # only support one must
-
-        matches = []
-        for searchable_index in searchable_indexes:
-            self.find_document_in_searchable_index(matches, must, 
searchable_doc_types, searchable_index)
-
-        return matches
-
-    def find_document_in_searchable_index(self, matches, must, 
searchable_doc_types, searchable_index):
-        for document in self.__documents_dict[searchable_index]:
-            if not searchable_doc_types or document.get("_type") in 
searchable_doc_types:
-                if "match_phrase" in must:
-                    self.match_must_phrase(document, matches, must)
-                else:
-                    matches.append(document)
-
-    @staticmethod
-    def match_must_phrase(document, matches, must):
-        for query_id in must["match_phrase"]:
-            query_val = must["match_phrase"][query_id]
-            if query_id in document["_source"]:
-                if query_val in document["_source"][query_id]:
-                    # use in as a proxy for match_phrase
-                    matches.append(document)
-
-    # Check index(es) exists.
-    def _validate_search_targets(self, targets, query):
-        # TODO: support allow_no_indices query parameter
-        matches = set()
-        for target in targets:
-            if target in ("_all", ""):
-                matches.update(self.__documents_dict)
-            elif "*" in target:
-                matches.update(fnmatch.filter(self.__documents_dict, target))
-            elif target not in self.__documents_dict:
-                raise 
MissingIndexException(msg=f"IndexMissingException[[{target}] missing]", 
query=query)
-        return matches
-
-    def _normalize_index_to_list(self, index, query):
-        # Ensure to have a list of index
-        if index is None:
-            searchable_indexes = self.__documents_dict.keys()
-        elif isinstance(index, str):
-            searchable_indexes = [index]
-        elif isinstance(index, list):
-            searchable_indexes = index
-        else:
-            # Is it the correct exception to use ?
-            raise ValueError("Invalid param 'index'")
-        generator = (target for index in searchable_indexes for target in 
index.split(","))
-        return list(self._validate_search_targets(generator, query=query))
-
-    @staticmethod
-    def _normalize_doc_type_to_list(doc_type):
-        # Ensure to have a list of index
-        if doc_type is None:
-            searchable_doc_types = []
-        elif isinstance(doc_type, str):
-            searchable_doc_types = [doc_type]
-        elif isinstance(doc_type, list):
-            searchable_doc_types = doc_type
-        else:
-            # Is it the correct exception to use ?
-            raise ValueError("Invalid param 'index'")
-
-        return searchable_doc_types
diff --git 
a/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/utilities/__init__.py
 
b/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/utilities/__init__.py
deleted file mode 100644
index 50b883e0f02..00000000000
--- 
a/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/utilities/__init__.py
+++ /dev/null
@@ -1,232 +0,0 @@
-"""Utilities for Elastic mock"""
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-#
-# The MIT License (MIT)
-#
-# Copyright (c) 2016 Marcos Cardoso
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in 
all
-# copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-# SOFTWARE.
-import base64
-import random
-import string
-from datetime import date, datetime
-from functools import wraps
-
-from elasticsearch.exceptions import NotFoundError
-
-DEFAULT_ELASTICSEARCH_ID_SIZE = 20
-CHARSET_FOR_ELASTICSEARCH_ID = string.ascii_letters + string.digits
-GLOBAL_PARAMS = ("pretty", "human", "error_trace", "format", "filter_path")
-
-
-def get_random_id(size=DEFAULT_ELASTICSEARCH_ID_SIZE):
-    """Returns random if for elasticsearch"""
-    return "".join(random.choices(CHARSET_FOR_ELASTICSEARCH_ID, k=size))
-
-
-def query_params(*es_query_params, **kwargs):
-    """
-    Decorator that pops all accepted parameters from method's kwargs and puts
-    them in the params argument.
-    """
-    body_params = kwargs.pop("body_params", None)
-    body_only_params = set(body_params or ()) - set(es_query_params)
-    body_name = kwargs.pop("body_name", None)
-    body_required = kwargs.pop("body_required", False)
-    type_possible_in_params = "type" in es_query_params
-
-    assert not (body_name and body_params)
-
-    assert not (body_name and body_required)
-    assert not body_required or body_params
-
-    def _wrapper(func):
-        @wraps(func)
-        def _wrapped(*args, **kwargs):
-            params = (kwargs.pop("params", None) or {}).copy()
-            headers = {k.lower(): v for k, v in (kwargs.pop("headers", None) 
or {}).copy().items()}
-
-            if "opaque_id" in kwargs:
-                headers["x-opaque-id"] = kwargs.pop("opaque_id")
-
-            http_auth = kwargs.pop("http_auth", None)
-            api_key = kwargs.pop("api_key", None)
-
-            using_body_kwarg = kwargs.get("body", None) is not None
-            using_positional_args = args and len(args) > 1
-
-            if type_possible_in_params:
-                doc_type_in_params = params and "doc_type" in params
-                doc_type_in_kwargs = "doc_type" in kwargs
-
-                if doc_type_in_params:
-                    params["type"] = params.pop("doc_type")
-                if doc_type_in_kwargs:
-                    kwargs["type"] = kwargs.pop("doc_type")
-
-            if using_body_kwarg or using_positional_args:
-                body_only_params_in_use = body_only_params.intersection(kwargs)
-                if body_only_params_in_use:
-                    params_prose = "', '".join(sorted(body_only_params_in_use))
-                    plural_params = len(body_only_params_in_use) > 1
-
-                    raise TypeError(
-                        f"The '{params_prose}' parameter{'s' if plural_params 
else ''} "
-                        f"{'are' if plural_params else 'is'} only serialized 
in the "
-                        f"request body and can't be combined with the 'body' 
parameter. "
-                        f"Either stop using the 'body' parameter and use 
keyword-arguments "
-                        f"only or move the specified parameters into the 
'body'. "
-                        f"See 
https://github.com/elastic/elasticsearch-py/issues/1698 "
-                        f"for more information"
-                    )
-
-            elif set(body_params or ()).intersection(kwargs):
-                body = {}
-                for param in body_params:
-                    value = kwargs.pop(param, None)
-                    if value is not None:
-                        body[param.rstrip("_")] = value
-                kwargs["body"] = body
-
-            elif body_required:
-                kwargs["body"] = {}
-
-            if body_name:
-                if body_name in kwargs:
-                    if using_body_kwarg:
-                        raise TypeError(
-                            f"Can't use '{body_name}' and 'body' parameters 
together"
-                            f" because '{body_name}' is an alias for 'body'. "
-                            f"Instead you should only use the '{body_name}' "
-                            f"parameter. See 
https://github.com/elastic/elasticsearch-py/issues/1698 "
-                            f"for more information"
-                        )
-                    kwargs["body"] = kwargs.pop(body_name)
-
-            if http_auth is not None and api_key is not None:
-                raise ValueError("Only one of 'http_auth' and 'api_key' may be 
passed at a time")
-            if http_auth is not None:
-                headers["authorization"] = f"Basic 
{_base64_auth_header(http_auth)}"
-            elif api_key is not None:
-                headers["authorization"] = f"ApiKey 
{_base64_auth_header(api_key)}"
-
-            for p in es_query_params + GLOBAL_PARAMS:
-                if p in kwargs:
-                    v = kwargs.pop(p)
-                    if v is not None:
-                        params[p] = _escape(v)
-
-            for p in ("ignore", "request_timeout"):
-                if p in kwargs:
-                    params[p] = kwargs.pop(p)
-            return func(*args, params=params, headers=headers, **kwargs)
-
-        return _wrapped
-
-    return _wrapper
-
-
-def to_str(x, encoding="ascii"):
-    if not isinstance(x, str):
-        return x.decode(encoding)
-    return x
-
-
-def to_bytes(x, encoding="ascii"):
-    if not isinstance(x, bytes):
-        return x.encode(encoding)
-    return x
-
-
-def _base64_auth_header(auth_value):
-    """Takes either a 2-tuple or a base64-encoded string
-    and returns a base64-encoded string to be used
-    as an HTTP authorization header.
-    """
-    if isinstance(auth_value, (list, tuple)):
-        auth_value = base64.b64encode(to_bytes(":".join(auth_value)))
-    return to_str(auth_value)
-
-
-def _escape(value):
-    """
-    Escape a single value of a URL string or a query parameter. If it is a list
-    or tuple, turn it into a comma-separated string first.
-    """
-
-    # make sequences into comma-separated strings
-    if isinstance(value, (list, tuple)):
-        value = ",".join(value)
-
-    # dates and datetimes into isoformat
-    elif isinstance(value, (date, datetime)):
-        value = value.isoformat()
-
-    # make bools into true/false strings
-    elif isinstance(value, bool):
-        value = str(value).lower()
-
-    # don't decode bytestrings
-    elif isinstance(value, bytes):
-        return value
-
-    # encode strings to utf-8
-    if isinstance(value, str):
-        return value.encode("utf-8")
-
-    return str(value)
-
-
-class MissingIndexException(NotFoundError):
-    """Exception representing a missing index."""
-
-    def __init__(self, msg, query):
-        self.msg = msg
-        self.query = query
-
-    def __str__(self):
-        return f"IndexMissingException[[{self.msg}] missing] with query 
{self.query}"
-
-
-class SearchFailedException(NotFoundError):
-    """Exception representing a search failure."""
-
-    def __init__(self, msg):
-        self.msg = msg
-
-    def __str__(self):
-        return f"SearchFailedException: {self.msg}"
diff --git 
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py 
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
index c679eedf38e..6b11c6068a5 100644
--- 
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
+++ 
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
@@ -17,13 +17,10 @@
 # under the License.
 from __future__ import annotations
 
+import dataclasses
 import json
 import logging
-import os
 import re
-import shutil
-import tempfile
-import uuid
 from io import StringIO
 from pathlib import Path
 from unittest import mock
@@ -35,12 +32,15 @@ import pendulum
 import pytest
 
 from airflow.providers.common.compat.sdk import conf
+from airflow.providers.elasticsearch.log.es_json_formatter import 
ElasticsearchJSONFormatter
 from airflow.providers.elasticsearch.log.es_response import 
ElasticSearchResponse
 from airflow.providers.elasticsearch.log.es_task_handler import (
     VALID_ES_CONFIG_KEYS,
     ElasticsearchRemoteLogIO,
     ElasticsearchTaskHandler,
+    _build_log_fields,
     _clean_date,
+    _format_error_detail,
     _render_log_id,
     get_es_kwargs_from_config,
     getattr_nested,
@@ -51,12 +51,16 @@ from airflow.utils.timezone import datetime
 
 from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.db import clear_db_dags, clear_db_runs
-from tests_common.test_utils.paths import AIRFLOW_PROVIDERS_ROOT_PATH
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-from unit.elasticsearch.log.elasticmock import elasticmock
-from unit.elasticsearch.log.elasticmock.utilities import SearchFailedException
 
-ES_PROVIDER_YAML_FILE = AIRFLOW_PROVIDERS_ROOT_PATH / "elasticsearch" / 
"provider.yaml"
+
[email protected]
+class _MockTI:
+    dag_id: str = "dag_for_testing_es_log_handler"
+    task_id: str = "task_for_testing_es_log_handler"
+    run_id: str = "run_for_testing_es_log_handler"
+    try_number: int = 1
+    map_index: int = -1
 
 
 def get_ti(dag_id, task_id, run_id, logical_date, create_task_instance):
@@ -73,6 +77,78 @@ def get_ti(dag_id, task_id, run_id, logical_date, 
create_task_instance):
     return ti
 
 
+def _build_es_search_response(*sources: dict, index: str = "test_index", 
doc_type: str = "_doc") -> dict:
+    hits = [
+        {
+            "_id": str(i),
+            "_index": index,
+            "_score": 1.0,
+            "_source": source,
+            "_type": doc_type,
+        }
+        for i, source in enumerate(sources, start=1)
+    ]
+    return {
+        "_shards": {"failed": 0, "skipped": 0, "successful": 1, "total": 1},
+        "hits": {
+            "hits": hits,
+            "max_score": 1.0,
+            "total": {"relation": "eq", "value": len(hits)},
+        },
+        "timed_out": False,
+        "took": 1,
+    }
+
+
+def _make_es_response(search, *sources: dict) -> ElasticSearchResponse:
+    return ElasticSearchResponse(search, _build_es_search_response(*sources))
+
+
+def _metadata_from_result(metadatas):
+    return metadatas if AIRFLOW_V_3_0_PLUS else metadatas[0]
+
+
+def _assert_log_events(logs, metadatas, *, expected_events: list[str], 
expected_sources: list[str]):
+    metadata = _metadata_from_result(metadatas)
+    if AIRFLOW_V_3_0_PLUS:
+        logs = list(logs)
+        assert logs[0].event == "::group::Log message source details"
+        assert logs[0].sources == expected_sources
+        assert logs[1].event == "::endgroup::"
+        assert [log.event for log in logs[2:]] == expected_events
+    else:
+        assert len(logs) == 1
+        assert len(logs[0]) == 1
+        assert logs[0][0][0] == expected_sources[0]
+        assert logs[0][0][1] == "\n".join(expected_events)
+    return metadata
+
+
+def _assert_no_logs(logs, metadatas):
+    metadata = _metadata_from_result(metadatas)
+    if AIRFLOW_V_3_0_PLUS:
+        assert logs == []
+    else:
+        assert logs == [[]]
+    return metadata
+
+
+def _assert_missing_log_message(logs):
+    expected_pattern = r"^\*\*\* Log .* not found in Elasticsearch.*"
+    if AIRFLOW_V_3_0_PLUS:
+        logs = list(logs)
+        assert len(logs) == 1
+        assert logs[0].event is not None
+        assert logs[0].event.startswith("*** Log ")
+        assert logs[0].event.endswith("may have been removed.")
+        assert logs[0].event
+        assert re.match(expected_pattern, logs[0].event) is not None
+    else:
+        assert len(logs) == 1
+        assert len(logs[0]) == 1
+        assert re.match(expected_pattern, logs[0][0][1]) is not None
+
+
 class TestElasticsearchTaskHandler:
     DAG_ID = "dag_for_testing_es_task_handler"
     TASK_ID = "task_for_testing_es_log_handler"
@@ -81,15 +157,39 @@ class TestElasticsearchTaskHandler:
     TRY_NUM = 1
     LOGICAL_DATE = datetime(2016, 1, 1)
     LOG_ID = f"{DAG_ID}-{TASK_ID}-{RUN_ID}-{MAP_INDEX}-{TRY_NUM}"
-    JSON_LOG_ID = f"{DAG_ID}-{TASK_ID}-{_clean_date(LOGICAL_DATE)}-1"
     FILENAME_TEMPLATE = "{try_number}.log"
 
-    # TODO: Remove when we stop testing for 2.11 compatibility
     @pytest.fixture(autouse=True)
     def _use_historical_filename_templates(self):
         with conf_vars({("core", "use_historical_filename_templates"): 
"True"}):
             yield
 
+    @pytest.fixture(autouse=True)
+    def _setup_handler(self, tmp_path):
+        self.local_log_location = str(tmp_path / "logs")
+        self.end_of_log_mark = "end_of_log\n"
+        self.write_stdout = False
+        self.json_format = False
+        self.json_fields = "asctime,filename,lineno,levelname,message,exc_text"
+        self.host_field = "host"
+        self.offset_field = "offset"
+        self.test_message = "some random stuff"
+        self.base_log_source = {
+            "message": self.test_message,
+            "event": self.test_message,
+            "log_id": self.LOG_ID,
+            "offset": 1,
+        }
+        self.es_task_handler = ElasticsearchTaskHandler(
+            base_log_folder=self.local_log_location,
+            end_of_log_mark=self.end_of_log_mark,
+            write_stdout=self.write_stdout,
+            json_format=self.json_format,
+            json_fields=self.json_fields,
+            host_field=self.host_field,
+            offset_field=self.offset_field,
+        )
+
     @pytest.fixture
     def ti(self, create_task_instance, create_log_template):
         create_log_template(
@@ -110,62 +210,6 @@ class TestElasticsearchTaskHandler:
         clear_db_runs()
         clear_db_dags()
 
-    @elasticmock
-    def setup_method(self, method):
-        self.local_log_location = "local/log/location"
-        self.end_of_log_mark = "end_of_log\n"
-        self.write_stdout = False
-        self.json_format = False
-        self.json_fields = "asctime,filename,lineno,levelname,message,exc_text"
-        self.host_field = "host"
-        self.offset_field = "offset"
-        self.es_task_handler = ElasticsearchTaskHandler(
-            base_log_folder=self.local_log_location,
-            end_of_log_mark=self.end_of_log_mark,
-            write_stdout=self.write_stdout,
-            json_format=self.json_format,
-            json_fields=self.json_fields,
-            host_field=self.host_field,
-            offset_field=self.offset_field,
-        )
-
-        self.es = elasticsearch.Elasticsearch("http://localhost:9200";)
-        self.index_name = "test_index"
-        self.doc_type = "log"
-        self.test_message = "some random stuff"
-        self.body = {
-            "message": self.test_message,
-            "log_id": self.LOG_ID,
-            "offset": 1,
-            "event": self.test_message,
-        }
-        self.es.index(index=self.index_name, doc_type=self.doc_type, 
document=self.body, id=1)
-
-    def teardown_method(self):
-        shutil.rmtree(self.local_log_location.split(os.path.sep)[0], 
ignore_errors=True)
-
-    @pytest.mark.parametrize(
-        "sample_response",
-        [
-            pytest.param(lambda self: self.es.sample_airflow_2_log_response(), 
id="airflow_2"),
-            pytest.param(lambda self: self.es.sample_airflow_3_log_response(), 
id="airflow_3"),
-        ],
-    )
-    def test_es_response(self, sample_response):
-        response = sample_response(self)
-        es_response = ElasticSearchResponse(self.es_task_handler, response)
-        logs_by_host = self.es_task_handler.io._group_logs_by_host(es_response)
-
-        for hosted_log in logs_by_host.values():
-            message = self.es_task_handler.concat_logs(hosted_log)
-
-        assert (
-            message == "Dependencies all met for dep_context=non-requeueable"
-            " deps ti=<TaskInstance: example_bash_operator.run_after_loop \n"
-            "Starting attempt 1 of 1\nExecuting <Task(BashOperator): 
run_after_loop> "
-            "on 2023-07-09 07:47:32+00:00"
-        )
-
     @pytest.mark.parametrize(
         ("host", "expected"),
         [
@@ -177,12 +221,9 @@ class TestElasticsearchTaskHandler:
         ],
     )
     def test_format_url(self, host, expected):
-        """
-        Test the format_url method of the ElasticsearchTaskHandler class.
-        """
         if expected == "ValueError":
             with pytest.raises(ValueError, match="'https://' is not a valid 
URL."):
-                assert ElasticsearchTaskHandler.format_url(host) == expected
+                ElasticsearchTaskHandler.format_url(host)
         else:
             assert ElasticsearchTaskHandler.format_url(host) == expected
 
@@ -197,7 +238,6 @@ class TestElasticsearchTaskHandler:
             "verify_certs": True,
         }
         assert es_conf == expected_dict
-        # ensure creating with configs does not fail
         ElasticsearchTaskHandler(
             base_log_folder=self.local_log_location,
             end_of_log_mark=self.end_of_log_mark,
@@ -209,387 +249,118 @@ class TestElasticsearchTaskHandler:
             es_kwargs=es_conf,
         )
 
-    def test_client_with_patterns(self):
-        # ensure creating with index patterns does not fail
-        patterns = "test_*,other_*"
-        handler = ElasticsearchTaskHandler(
-            base_log_folder=self.local_log_location,
-            end_of_log_mark=self.end_of_log_mark,
-            write_stdout=self.write_stdout,
-            json_format=self.json_format,
-            json_fields=self.json_fields,
-            host_field=self.host_field,
-            offset_field=self.offset_field,
-            index_patterns=patterns,
-        )
-        assert handler.index_patterns == patterns
-
-    @pytest.mark.db_test
-    def test_read(self, ti):
-        ts = pendulum.now()
-        logs, metadatas = self.es_task_handler.read(
-            ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": 
False}
-        )
-
-        if AIRFLOW_V_3_0_PLUS:
-            logs = list(logs)
-            assert logs[0].event == "::group::Log message source details"
-            assert logs[0].sources == ["http://localhost:9200";]
-            assert logs[1].event == "::endgroup::"
-            assert logs[2].event == "some random stuff"
-
-            metadata = metadatas
-        else:
-            assert len(logs) == 1
-            assert len(logs) == len(metadatas)
-            assert len(logs[0]) == 1
-            assert self.test_message == logs[0][0][-1]
-
-            metadata = metadatas[0]
-
-        assert not metadata["end_of_log"]
-        assert metadata["offset"] == "1"
-        assert timezone.parse(metadata["last_log_timestamp"]) > ts
-
-    @pytest.mark.db_test
-    def test_read_with_patterns(self, ti):
-        ts = pendulum.now()
-        with mock.patch.object(self.es_task_handler, "index_patterns", 
new="test_*,other_*"):
-            logs, metadatas = self.es_task_handler.read(
-                ti, 1, {"offset": 0, "last_log_timestamp": str(ts), 
"end_of_log": False}
-            )
-
-        if AIRFLOW_V_3_0_PLUS:
-            logs = list(logs)
-            assert logs[0].event == "::group::Log message source details"
-            assert logs[0].sources == ["http://localhost:9200";]
-            assert logs[1].event == "::endgroup::"
-            assert logs[2].event == "some random stuff"
-
-            metadata = metadatas
-        else:
-            assert len(logs) == 1
-            assert len(logs) == len(metadatas)
-            assert len(logs[0]) == 1
-            assert self.test_message == logs[0][0][-1]
-
-            metadata = metadatas[0]
-
-        assert not metadata["end_of_log"]
-        assert metadata["offset"] == "1"
-        assert timezone.parse(metadata["last_log_timestamp"]) > ts
-
     @pytest.mark.db_test
-    def test_read_with_patterns_no_match(self, ti):
-        ts = pendulum.now()
-        with mock.patch.object(self.es_task_handler.io, "index_patterns", 
new="test_other_*,test_another_*"):
-            logs, metadatas = self.es_task_handler.read(
-                ti, 1, {"offset": 0, "last_log_timestamp": str(ts), 
"end_of_log": False}
-            )
-
-        if AIRFLOW_V_3_0_PLUS:
-            assert logs == []
-
-            metadata = metadatas
-        else:
-            assert len(logs) == 1
-            assert len(logs) == len(metadatas)
-            assert logs == [[]]
-
-            metadata = metadatas[0]
-
-        assert metadata["offset"] == "0"
-        assert metadata["end_of_log"]
-        # last_log_timestamp won't change if no log lines read.
-        assert timezone.parse(metadata["last_log_timestamp"]) == ts
-
-    @pytest.mark.db_test
-    def test_read_with_missing_index(self, ti):
-        ts = pendulum.now()
-        with mock.patch.object(self.es_task_handler.io, "index_patterns", 
new="nonexistent,test_*"):
-            with pytest.raises(elasticsearch.exceptions.NotFoundError, 
match=r"IndexMissingException.*"):
-                self.es_task_handler.read(
+    @pytest.mark.parametrize("metadata_mode", ["provided", "none", "empty"])
+    def test_read(self, ti, metadata_mode):
+        start_time = pendulum.now()
+        response = _make_es_response(self.es_task_handler.io, 
self.base_log_source)
+
+        with patch.object(self.es_task_handler.io, "_es_read", 
return_value=response):
+            if metadata_mode == "provided":
+                logs, metadatas = self.es_task_handler.read(
                     ti,
                     1,
-                    {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": 
False},
+                    {"offset": 0, "last_log_timestamp": str(start_time), 
"end_of_log": False},
                 )
-
-    @pytest.mark.db_test
-    @pytest.mark.parametrize("seconds", [3, 6])
-    def test_read_missing_logs(self, seconds, create_task_instance):
-        """
-        When the log actually isn't there to be found, we only want to wait 
for 5 seconds.
-        In this case we expect to receive a message of the form 'Log {log_id} 
not found in elasticsearch ...'
-        """
-        run_id = "wrong_run_id"
-        ti = get_ti(
-            self.DAG_ID,
-            self.TASK_ID,
-            run_id,
-            pendulum.instance(self.LOGICAL_DATE).add(days=1),  # so logs are 
not found
-            create_task_instance=create_task_instance,
-        )
-        ts = pendulum.now().add(seconds=-seconds)
-        logs, metadatas = self.es_task_handler.read(ti, 1, {"offset": 0, 
"last_log_timestamp": str(ts)})
-        if AIRFLOW_V_3_0_PLUS:
-            logs = list(logs)
-            if seconds > 5:
-                # we expect a log not found message when checking began more 
than 5 seconds ago
-                expected_pattern = r"^\*\*\* Log .* not found in 
Elasticsearch.*"
-                assert re.match(expected_pattern, logs[0].event) is not None
-                assert metadatas["end_of_log"] is True
+            elif metadata_mode == "empty":
+                logs, metadatas = self.es_task_handler.read(ti, 1, {})
             else:
-                # we've "waited" less than 5 seconds so it should not be "end 
of log" and should be no log message
-                assert logs == []
-                assert metadatas["end_of_log"] is True
-            assert metadatas["offset"] == "0"
-            assert timezone.parse(metadatas["last_log_timestamp"]) == ts
-        else:
-            assert len(logs) == 1
-            if seconds > 5:
-                # we expect a log not found message when checking began more 
than 5 seconds ago
-                assert len(logs[0]) == 1
-                actual_message = logs[0][0][1]
-                expected_pattern = r"^\*\*\* Log .* not found in 
Elasticsearch.*"
-                assert re.match(expected_pattern, actual_message) is not None
-                assert metadatas[0]["end_of_log"] is True
-            else:
-                # we've "waited" less than 5 seconds so it should not be "end 
of log" and should be no log message
-                assert len(logs[0]) == 0
-                assert logs == [[]]
-                assert metadatas[0]["end_of_log"] is True
-            assert len(logs) == len(metadatas)
-            assert metadatas[0]["offset"] == "0"
-            assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts
+                logs, metadatas = self.es_task_handler.read(ti, 1)
 
-    @pytest.mark.db_test
-    def test_read_with_match_phrase_query(self, ti):
-        similar_log_id = (
-            f"{TestElasticsearchTaskHandler.TASK_ID}-"
-            
f"{TestElasticsearchTaskHandler.DAG_ID}-2016-01-01T00:00:00+00:00-1"
+        metadata = _assert_log_events(
+            logs,
+            metadatas,
+            expected_events=[self.test_message],
+            expected_sources=["http://localhost:9200";],
         )
-        another_test_message = "another message"
-
-        another_body = {
-            "message": another_test_message,
-            "log_id": similar_log_id,
-            "offset": 1,
-        }
-        self.es.index(index=self.index_name, doc_type=self.doc_type, 
document=another_body, id=1)
-
-        ts = pendulum.now()
-        logs, metadatas = self.es_task_handler.read(
-            ti,
-            1,
-            {
-                "offset": "0",
-                "last_log_timestamp": str(ts),
-                "end_of_log": False,
-                "max_offset": 2,
-            },
-        )
-        if AIRFLOW_V_3_0_PLUS:
-            logs = list(logs)
-            assert logs[0].event == "::group::Log message source details"
-            assert logs[0].sources == ["http://localhost:9200";]
-            assert logs[1].event == "::endgroup::"
-            assert logs[2].event == "some random stuff"
-
-            metadata = metadatas
-        else:
-            assert len(logs) == 1
-            assert len(logs) == len(metadatas)
-            assert len(logs[0]) == 1
-            assert self.test_message == logs[0][0][-1]
-
-            metadata = metadatas[0]
-
-        assert not metadata["end_of_log"]
-        assert metadata["offset"] == "1"
-        assert timezone.parse(metadata["last_log_timestamp"]) > ts
-
-    @pytest.mark.db_test
-    def test_read_with_none_metadata(self, ti):
-        logs, metadatas = self.es_task_handler.read(ti, 1)
-        if AIRFLOW_V_3_0_PLUS:
-            logs = list(logs)
-            assert logs[0].event == "::group::Log message source details"
-            assert logs[0].sources == ["http://localhost:9200";]
-            assert logs[1].event == "::endgroup::"
-            assert logs[2].event == "some random stuff"
-
-            metadata = metadatas
-        else:
-            assert len(logs) == 1
-            assert len(logs) == len(metadatas)
-            assert len(logs[0]) == 1
-            assert self.test_message == logs[0][0][-1]
-
-            metadata = metadatas[0]
 
         assert not metadata["end_of_log"]
         assert metadata["offset"] == "1"
-        assert timezone.parse(metadata["last_log_timestamp"]) < pendulum.now()
+        assert timezone.parse(metadata["last_log_timestamp"]) >= start_time
 
     @pytest.mark.db_test
-    def test_read_nonexistent_log(self, ti):
-        ts = pendulum.now()
-        # In ElasticMock, search is going to return all documents with 
matching index
-        # and doc_type regardless of match filters, so we delete the log entry 
instead
-        # of making a new TaskInstance to query.
-        self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
-        logs, metadatas = self.es_task_handler.read(
-            ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": 
False}
-        )
-        if AIRFLOW_V_3_0_PLUS:
-            assert logs == []
-
-            metadata = metadatas
-        else:
-            assert len(logs) == 1
-            assert len(logs) == len(metadatas)
-            assert logs == [[]]
-
-            metadata = metadatas[0]
+    def test_read_defaults_offset_when_missing_from_metadata(self, ti):
+        start_time = pendulum.now()
+        with patch.object(self.es_task_handler.io, "_es_read", 
return_value=None):
+            logs, metadatas = self.es_task_handler.read(ti, 1, {"end_of_log": 
False})
 
-        assert metadata["offset"] == "0"
+        metadata = _assert_no_logs(logs, metadatas)
         assert metadata["end_of_log"]
-        # last_log_timestamp won't change if no log lines read.
-        assert timezone.parse(metadata["last_log_timestamp"]) == ts
+        assert metadata["offset"] == "0"
+        assert timezone.parse(metadata["last_log_timestamp"]) >= start_time
 
     @pytest.mark.db_test
-    def test_read_with_empty_metadata(self, ti):
-        ts = pendulum.now()
-        logs, metadatas = self.es_task_handler.read(ti, 1, {})
-        print(f"metadatas: {metadatas}")
-        if AIRFLOW_V_3_0_PLUS:
-            logs = list(logs)
-            assert logs[0].event == "::group::Log message source details"
-            assert logs[0].sources == ["http://localhost:9200";]
-            assert logs[1].event == "::endgroup::"
-            assert logs[2].event == "some random stuff"
-
-            metadata = metadatas
-        else:
-            assert len(logs) == 1
-            assert len(logs) == len(metadatas)
-            assert len(logs[0]) == 1
-            assert self.test_message == logs[0][0][-1]
-
-            metadata = metadatas[0]
-        print(f"metadatas: {metadatas}")
-        assert not metadata["end_of_log"]
-        # offset should be initialized to 0 if not provided.
-        assert metadata["offset"] == "1"
-        # last_log_timestamp will be initialized using log reading time
-        # if not last_log_timestamp is provided.
-        assert timezone.parse(metadata["last_log_timestamp"]) > ts
-
-        # case where offset is missing but metadata not empty.
-        self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
-        logs, metadatas = self.es_task_handler.read(ti, 1, {"end_of_log": 
False})
-        if AIRFLOW_V_3_0_PLUS:
-            assert logs == []
+    @pytest.mark.parametrize("seconds", [3, 6])
+    def test_read_missing_logs(self, ti, seconds):
+        start_time = pendulum.now().add(seconds=-seconds)
+        with patch.object(self.es_task_handler.io, "_es_read", 
return_value=None):
+            logs, metadatas = self.es_task_handler.read(
+                ti,
+                1,
+                {"offset": 0, "last_log_timestamp": str(start_time), 
"end_of_log": False},
+            )
 
-            metadata = metadatas
+        metadata = _metadata_from_result(metadatas)
+        if seconds > 5:
+            _assert_missing_log_message(logs)
         else:
-            assert len(logs) == 1
-            assert len(logs) == len(metadatas)
-            assert logs == [[]]
-
-            metadata = metadatas[0]
+            _assert_no_logs(logs, metadatas)
 
         assert metadata["end_of_log"]
-        # offset should be initialized to 0 if not provided.
         assert metadata["offset"] == "0"
-        # last_log_timestamp will be initialized using log reading time
-        # if not last_log_timestamp is provided.
-        assert timezone.parse(metadata["last_log_timestamp"]) > ts
+        assert timezone.parse(metadata["last_log_timestamp"]) == start_time
 
     @pytest.mark.db_test
     def test_read_timeout(self, ti):
-        ts = pendulum.now().subtract(minutes=5)
-
-        self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
-        # in the below call, offset=1 implies that we have already retrieved 
something
-        # if we had never retrieved any logs at all (offset=0), then we would 
have gotten
-        # a "logs not found" message after 5 seconds of trying
-        offset = 1
-        logs, metadatas = self.es_task_handler.read(
-            task_instance=ti,
-            try_number=1,
-            metadata={
-                "offset": offset,
-                "last_log_timestamp": str(ts),
-                "end_of_log": False,
-            },
-        )
-        if AIRFLOW_V_3_0_PLUS:
-            assert logs == []
-
-            metadata = metadatas
-        else:
-            assert len(logs) == 1
-            assert len(logs) == len(metadatas)
-            assert logs == [[]]
-
-            metadata = metadatas[0]
+        start_time = pendulum.now().subtract(minutes=5)
+        with patch.object(self.es_task_handler.io, "_es_read", 
return_value=None):
+            logs, metadatas = self.es_task_handler.read(
+                task_instance=ti,
+                try_number=1,
+                metadata={
+                    "offset": 1,
+                    "last_log_timestamp": str(start_time),
+                    "end_of_log": False,
+                },
+            )
 
+        metadata = _assert_no_logs(logs, metadatas)
         assert metadata["end_of_log"]
-        assert str(offset) == metadata["offset"]
-        assert timezone.parse(metadata["last_log_timestamp"]) == ts
+        assert metadata["offset"] == "1"
+        assert timezone.parse(metadata["last_log_timestamp"]) == start_time
 
     @pytest.mark.db_test
-    def test_read_as_download_logs(self, ti):
-        ts = pendulum.now()
-        logs, metadatas = self.es_task_handler.read(
-            ti,
-            1,
+    def test_read_with_custom_offset_and_host_fields(self, ti):
+        self.es_task_handler.host_field = "host.name"
+        self.es_task_handler.offset_field = "log.offset"
+        self.es_task_handler.io.host_field = "host.name"
+        self.es_task_handler.io.offset_field = "log.offset"
+        response = _make_es_response(
+            self.es_task_handler.io,
             {
-                "offset": 0,
-                "last_log_timestamp": str(ts),
-                "download_logs": True,
-                "end_of_log": False,
+                "message": self.test_message,
+                "event": self.test_message,
+                "log_id": self.LOG_ID,
+                "log": {"offset": 1},
+                "host": {"name": "somehostname"},
             },
         )
-        if AIRFLOW_V_3_0_PLUS:
-            logs = list(logs)
-            assert logs[0].event == "::group::Log message source details"
-            assert logs[0].sources == ["http://localhost:9200";]
-            assert logs[1].event == "::endgroup::"
-            assert logs[2].event == "some random stuff"
-
-            metadata = metadatas
-        else:
-            assert len(logs) == 1
-            assert len(logs) == len(metadatas)
-            assert len(logs[0]) == 1
-            assert self.test_message == logs[0][0][-1]
 
-            metadata = metadatas[0]
+        with patch.object(self.es_task_handler.io, "_es_read", 
return_value=response):
+            logs, metadatas = self.es_task_handler.read(
+                ti,
+                1,
+                {"offset": 0, "last_log_timestamp": str(pendulum.now()), 
"end_of_log": False},
+            )
 
-        assert not metadata["end_of_log"]
+        metadata = _assert_log_events(
+            logs,
+            metadatas,
+            expected_events=[self.test_message],
+            expected_sources=["somehostname"],
+        )
         assert metadata["offset"] == "1"
-        assert timezone.parse(metadata["last_log_timestamp"]) > ts
-
-    @pytest.mark.db_test
-    def test_read_raises(self, ti):
-        with mock.patch.object(self.es_task_handler.io.log, "exception") as 
mock_exception:
-            with mock.patch.object(self.es_task_handler.io.client, "search") 
as mock_execute:
-                mock_execute.side_effect = SearchFailedException("Failed to 
read")
-                log_sources, log_msgs = self.es_task_handler.io.read("", ti)
-            assert mock_exception.call_count == 1
-            args, kwargs = mock_exception.call_args
-            assert "Could not read log with log_id:" in args[0]
-
-        if AIRFLOW_V_3_0_PLUS:
-            assert log_sources == []
-        else:
-            assert len(log_sources) == 0
-            assert len(log_msgs) == 1
-            assert log_sources == []
-
-        assert "not found in Elasticsearch" in log_msgs[0]
+        assert not metadata["end_of_log"]
 
     @pytest.mark.db_test
     def test_set_context(self, ti):
@@ -598,118 +369,29 @@ class TestElasticsearchTaskHandler:
 
     @pytest.mark.db_test
     def test_set_context_w_json_format_and_write_stdout(self, ti):
-        formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s 
- %(message)s")
-        self.es_task_handler.formatter = formatter
-        self.es_task_handler.write_stdout = True
-        self.es_task_handler.json_format = True
-        self.es_task_handler.set_context(ti)
-
-    @pytest.mark.db_test
-    def test_read_with_json_format(self, ti):
-        ts = pendulum.now()
-        formatter = logging.Formatter(
-            "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - 
%(message)s - %(exc_text)s"
-        )
-        self.es_task_handler.formatter = formatter
-        self.es_task_handler.json_format = True
-
-        self.body = {
-            "message": self.test_message,
-            "event": self.test_message,
-            "log_id": 
f"{self.DAG_ID}-{self.TASK_ID}-2016_01_01T00_00_00_000000-1",
-            "offset": 1,
-            "asctime": "2020-12-24 19:25:00,962",
-            "filename": "taskinstance.py",
-            "lineno": 851,
-            "levelname": "INFO",
-        }
-        self.es_task_handler.set_context(ti)
-        self.es.index(index=self.index_name, doc_type=self.doc_type, 
document=self.body, id=id)
-
-        logs, _ = self.es_task_handler.read(
-            ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": 
False}
+        self.es_task_handler.formatter = logging.Formatter(
+            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
         )
-        if AIRFLOW_V_3_0_PLUS:
-            logs = list(logs)
-            assert logs[2].event == self.test_message
-        else:
-            assert logs[0][0][1] == self.test_message
-
-    @pytest.mark.db_test
-    def test_read_with_json_format_with_custom_offset_and_host_fields(self, 
ti):
-        ts = pendulum.now()
-        formatter = logging.Formatter(
-            "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - 
%(message)s - %(exc_text)s"
-        )
-        self.es_task_handler.formatter = formatter
+        self.es_task_handler.write_stdout = True
         self.es_task_handler.json_format = True
-        self.es_task_handler.host_field = "host.name"
-        self.es_task_handler.offset_field = "log.offset"
 
-        self.body = {
-            "message": self.test_message,
-            "event": self.test_message,
-            "log_id": self.LOG_ID,
-            "log": {"offset": 1},
-            "host": {"name": "somehostname"},
-            "asctime": "2020-12-24 19:25:00,962",
-            "filename": "taskinstance.py",
-            "lineno": 851,
-            "levelname": "INFO",
-        }
         self.es_task_handler.set_context(ti)
-        self.es.index(index=self.index_name, doc_type=self.doc_type, 
document=self.body, id=id)
-
-        logs, _ = self.es_task_handler.read(
-            ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": 
False}
-        )
-        if AIRFLOW_V_3_0_PLUS:
-            logs = list(logs)
-            assert logs[2].event == self.test_message
-        else:
-            assert logs[0][0][1] == self.test_message
-
-    @pytest.mark.db_test
-    def test_read_with_custom_offset_and_host_fields(self, ti):
-        ts = pendulum.now()
-        # Delete the existing log entry as it doesn't have the new offset and 
host fields
-        self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
 
-        self.es_task_handler.host_field = "host.name"
-        self.es_task_handler.offset_field = "log.offset"
-
-        self.body = {
-            "message": self.test_message,
-            "event": self.test_message,
-            "log_id": self.LOG_ID,
-            "log": {"offset": 1},
-            "host": {"name": "somehostname"},
-        }
-        self.es.index(index=self.index_name, doc_type=self.doc_type, 
document=self.body, id=id)
-
-        logs, _ = self.es_task_handler.read(
-            ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": 
False}
-        )
-        if AIRFLOW_V_3_0_PLUS:
-            pass
-        else:
-            assert logs[0][0][1] == "some random stuff"
+        assert isinstance(self.es_task_handler.formatter, 
ElasticsearchJSONFormatter)
+        assert isinstance(self.es_task_handler.handler, logging.StreamHandler)
+        assert self.es_task_handler.context_set
 
     @pytest.mark.db_test
     def test_close(self, ti):
-        formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s 
- %(message)s")
-        self.es_task_handler.formatter = formatter
+        self.es_task_handler.formatter = logging.Formatter(
+            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+        )
 
         self.es_task_handler.set_context(ti)
         self.es_task_handler.close()
-        with open(
-            os.path.join(self.local_log_location, 
self.FILENAME_TEMPLATE.format(try_number=1))
-        ) as log_file:
-            # end_of_log_mark may contain characters like '\n' which is needed 
to
-            # have the log uploaded but will not be stored in elasticsearch.
-            # so apply the strip() to log_file.read()
-            log_line = log_file.read().strip()
-            assert log_line.endswith(self.end_of_log_mark.strip())
+
+        log_file = Path(self.local_log_location) / 
self.FILENAME_TEMPLATE.format(try_number=1)
+        assert 
log_file.read_text().strip().endswith(self.end_of_log_mark.strip())
         assert self.es_task_handler.closed
 
     @pytest.mark.db_test
@@ -717,60 +399,40 @@ class TestElasticsearchTaskHandler:
         ti.raw = True
         self.es_task_handler.set_context(ti)
         self.es_task_handler.close()
-        with open(
-            os.path.join(self.local_log_location, 
self.FILENAME_TEMPLATE.format(try_number=1))
-        ) as log_file:
-            assert self.end_of_log_mark not in log_file.read()
-        assert self.es_task_handler.closed
 
-    @pytest.mark.db_test
-    def test_close_closed(self, ti):
-        self.es_task_handler.closed = True
-        self.es_task_handler.set_context(ti)
-        self.es_task_handler.close()
-        with open(
-            os.path.join(self.local_log_location, 
self.FILENAME_TEMPLATE.format(try_number=1))
-        ) as log_file:
-            assert len(log_file.read()) == 0
+        log_file = Path(self.local_log_location) / 
self.FILENAME_TEMPLATE.format(try_number=1)
+        assert self.end_of_log_mark not in log_file.read_text()
+        assert self.es_task_handler.closed
 
     @pytest.mark.db_test
     def test_close_with_no_handler(self, ti):
         self.es_task_handler.set_context(ti)
         self.es_task_handler.handler = None
         self.es_task_handler.close()
-        with open(
-            os.path.join(self.local_log_location, 
self.FILENAME_TEMPLATE.format(try_number=1))
-        ) as log_file:
-            assert len(log_file.read()) == 0
+
+        log_file = Path(self.local_log_location) / 
self.FILENAME_TEMPLATE.format(try_number=1)
+        assert log_file.read_text() == ""
         assert self.es_task_handler.closed
 
     @pytest.mark.db_test
-    def test_close_with_no_stream(self, ti):
+    @pytest.mark.parametrize("stream_state", ["none", "closed"])
+    def test_close_reopens_stream(self, ti, stream_state):
         self.es_task_handler.set_context(ti)
-        self.es_task_handler.handler.stream = None
-        self.es_task_handler.close()
-        with open(
-            os.path.join(self.local_log_location, 
self.FILENAME_TEMPLATE.format(try_number=1))
-        ) as log_file:
-            assert self.end_of_log_mark in log_file.read()
-        assert self.es_task_handler.closed
+        if stream_state == "none":
+            self.es_task_handler.handler.stream = None
+        else:
+            self.es_task_handler.handler.stream.close()
 
-        self.es_task_handler.set_context(ti)
-        self.es_task_handler.handler.stream.close()
         self.es_task_handler.close()
-        with open(
-            os.path.join(self.local_log_location, 
self.FILENAME_TEMPLATE.format(try_number=1))
-        ) as log_file:
-            assert self.end_of_log_mark in log_file.read()
+
+        log_file = Path(self.local_log_location) / 
self.FILENAME_TEMPLATE.format(try_number=1)
+        assert self.end_of_log_mark in log_file.read_text()
         assert self.es_task_handler.closed
 
     @pytest.mark.db_test
     def test_render_log_id(self, ti):
         assert _render_log_id(self.es_task_handler.log_id_template, ti, 1) == 
self.LOG_ID
 
-        self.es_task_handler.json_format = True
-        assert _render_log_id(self.es_task_handler.log_id_template, ti, 1) == 
self.LOG_ID
-
     def test_clean_date(self):
         clean_logical_date = _clean_date(datetime(2016, 7, 8, 9, 10, 11, 12))
         assert clean_logical_date == "2016_07_08T09_10_11_000012"
@@ -779,35 +441,12 @@ class TestElasticsearchTaskHandler:
     @pytest.mark.parametrize(
         ("json_format", "es_frontend", "expected_url"),
         [
-            # Common cases
-            (
-                True,
-                "localhost:5601/{log_id}",
-                "https://localhost:5601/"; + quote(LOG_ID),
-            ),
-            (
-                False,
-                "localhost:5601/{log_id}",
-                "https://localhost:5601/"; + quote(LOG_ID),
-            ),
-            # Ignore template if "{log_id}"" is missing in the URL
+            (True, "localhost:5601/{log_id}", 
"https://localhost:5601/{log_id}";),
+            (False, "localhost:5601/{log_id}", 
"https://localhost:5601/{log_id}";),
             (False, "localhost:5601", "https://localhost:5601";),
-            # scheme handling
-            (
-                False,
-                "https://localhost:5601/path/{log_id}";,
-                "https://localhost:5601/path/"; + quote(LOG_ID),
-            ),
-            (
-                False,
-                "http://localhost:5601/path/{log_id}";,
-                "http://localhost:5601/path/"; + quote(LOG_ID),
-            ),
-            (
-                False,
-                "other://localhost:5601/path/{log_id}",
-                "other://localhost:5601/path/" + quote(LOG_ID),
-            ),
+            (False, "https://localhost:5601/path/{log_id}";, 
"https://localhost:5601/path/{log_id}";),
+            (False, "http://localhost:5601/path/{log_id}";, 
"http://localhost:5601/path/{log_id}";),
+            (False, "other://localhost:5601/path/{log_id}", 
"other://localhost:5601/path/{log_id}"),
         ],
     )
     def test_get_external_log_url(self, ti, json_format, es_frontend, 
expected_url):
@@ -821,8 +460,9 @@ class TestElasticsearchTaskHandler:
             offset_field=self.offset_field,
             frontend=es_frontend,
         )
-        url = es_task_handler.get_external_log_url(ti, ti.try_number)
-        assert expected_url == url
+        assert es_task_handler.get_external_log_url(ti, ti.try_number) == 
expected_url.format(
+            log_id=quote(self.LOG_ID)
+        )
 
     @pytest.mark.parametrize(
         ("frontend", "expected"),
@@ -838,7 +478,6 @@ class TestElasticsearchTaskHandler:
     @pytest.mark.db_test
     @mock.patch("sys.__stdout__", new_callable=StringIO)
     def test_dynamic_offset(self, stdout_mock, ti, time_machine):
-        # arrange
         handler = ElasticsearchTaskHandler(
             base_log_folder=self.local_log_location,
             end_of_log_mark=self.end_of_log_mark,
@@ -850,7 +489,7 @@ class TestElasticsearchTaskHandler:
         )
         handler.formatter = logging.Formatter()
 
-        logger = logging.getLogger(__name__)
+        logger = logging.getLogger("tests.elasticsearch.dynamic_offset")
         logger.handlers = [handler]
         logger.propagate = False
 
@@ -858,17 +497,19 @@ class TestElasticsearchTaskHandler:
         handler.set_context(ti)
 
         t1 = pendulum.local(year=2017, month=1, day=1, hour=1, minute=1, 
second=15)
-        t2, t3 = t1 + pendulum.duration(seconds=5), t1 + 
pendulum.duration(seconds=10)
-
-        # act
-        time_machine.move_to(t1, tick=False)
-        ti.log.info("Test")
-        time_machine.move_to(t2, tick=False)
-        ti.log.info("Test2")
-        time_machine.move_to(t3, tick=False)
-        ti.log.info("Test3")
+        t2 = t1 + pendulum.duration(seconds=5)
+        t3 = t1 + pendulum.duration(seconds=10)
+
+        try:
+            time_machine.move_to(t1, tick=False)
+            ti.log.info("Test")
+            time_machine.move_to(t2, tick=False)
+            ti.log.info("Test2")
+            time_machine.move_to(t3, tick=False)
+            ti.log.info("Test3")
+        finally:
+            logger.handlers = []
 
-        # assert
         first_log, second_log, third_log = map(json.loads, 
stdout_mock.getvalue().strip().splitlines())
         assert first_log["offset"] < second_log["offset"] < third_log["offset"]
         assert first_log["asctime"] == t1.format("YYYY-MM-DDTHH:mm:ss.SSSZZ")
@@ -883,12 +524,11 @@ class TestElasticsearchTaskHandler:
             self.es_task_handler.io.index_patterns_callable = 
"path.to.index_pattern_callable"
             result = self.es_task_handler.io._get_index_patterns({})
 
-            
mock_import_string.assert_called_once_with("path.to.index_pattern_callable")
-            mock_callable.assert_called_once_with({})
-            assert result == "callable_index_pattern"
+        
mock_import_string.assert_called_once_with("path.to.index_pattern_callable")
+        mock_callable.assert_called_once_with({})
+        assert result == "callable_index_pattern"
 
     def test_filename_template_for_backward_compatibility(self):
-        # filename_template arg support for running the latest provider on 
airflow 2
         ElasticsearchTaskHandler(
             base_log_folder="local/log/location",
             end_of_log_mark="end_of_log\n",
@@ -899,129 +539,72 @@ class TestElasticsearchTaskHandler:
         )
 
 
-def test_safe_attrgetter():
-    class A: ...
-
-    a = A()
-    a.b = "b"
-    a.c = None
-    a.x = a
-    a.x.d = "blah"
-    assert getattr_nested(a, "b", None) == "b"  # regular getattr
-    assert getattr_nested(a, "x.d", None) == "blah"  # nested val
-    assert getattr_nested(a, "aa", "heya") == "heya"  # respects non-none 
default
-    assert getattr_nested(a, "c", "heya") is None  # respects none value
-    assert getattr_nested(a, "aa", None) is None  # respects none default
-
-
-def test_retrieve_config_keys():
-    """
-    Tests that the ElasticsearchTaskHandler retrieves the correct 
configuration keys from the config file.
-    * old_parameters are removed
-    * parameters from config are automatically added
-    * constructor parameters missing from config are also added
-    :return:
-    """
-    with conf_vars(
-        {
-            ("elasticsearch_configs", "http_compress"): "False",
-            ("elasticsearch_configs", "request_timeout"): "10",
-        }
-    ):
-        args_from_config = get_es_kwargs_from_config().keys()
-        # verify_certs comes from default config value
-        assert "verify_certs" in args_from_config
-        # request_timeout comes from config provided value
-        assert "request_timeout" in args_from_config
-        # http_compress comes from config value
-        assert "http_compress" in args_from_config
-        assert "self" not in args_from_config
-
-
-def test_retrieve_retry_on_timeout():
-    """
-    Test if retrieve timeout is converted to retry_on_timeout.
-    """
-    with conf_vars(
-        {
-            ("elasticsearch_configs", "retry_on_timeout"): "True",
-        }
-    ):
-        args_from_config = get_es_kwargs_from_config().keys()
-        # verify_certs comes from default config value
-        assert "retry_on_timeout" in args_from_config
+class TestTaskHandlerHelpers:
+    def test_safe_attrgetter(self):
+        class A: ...
 
+        a = A()
+        a.b = "b"
+        a.c = None
+        a.x = a
+        a.x.d = "blah"
+        assert getattr_nested(a, "b", None) == "b"
+        assert getattr_nested(a, "x.d", None) == "blah"
+        assert getattr_nested(a, "aa", "heya") == "heya"
+        assert getattr_nested(a, "c", "heya") is None
+        assert getattr_nested(a, "aa", None) is None
 
-def test_self_not_valid_arg():
-    """
-    Test if self is not a valid argument.
-    """
-    assert "self" not in VALID_ES_CONFIG_KEYS
+    def test_retrieve_config_keys(self):
+        with conf_vars(
+            {
+                ("elasticsearch_configs", "http_compress"): "False",
+                ("elasticsearch_configs", "request_timeout"): "10",
+            }
+        ):
+            args_from_config = get_es_kwargs_from_config().keys()
+            assert "verify_certs" in args_from_config
+            assert "request_timeout" in args_from_config
+            assert "http_compress" in args_from_config
+            assert "self" not in args_from_config
+
+    def test_retrieve_retry_on_timeout(self):
+        with conf_vars(
+            {
+                ("elasticsearch_configs", "retry_on_timeout"): "True",
+            }
+        ):
+            args_from_config = get_es_kwargs_from_config().keys()
+            assert "retry_on_timeout" in args_from_config
 
+    def test_self_not_valid_arg(self):
+        assert "self" not in VALID_ES_CONFIG_KEYS
 
[email protected]_test
-class TestElasticsearchRemoteLogIO:
-    DAG_ID = "dag_for_testing_es_log_handler"
-    TASK_ID = "task_for_testing_es_log_handler"
-    RUN_ID = "run_for_testing_es_log_handler"
-    LOGICAL_DATE = datetime(2016, 1, 1)
-    FILENAME_TEMPLATE = "{try_number}.log"
 
+class TestElasticsearchRemoteLogIO:
     @pytest.fixture(autouse=True)
-    def setup_tests(self, ti):
+    def _setup_tests(self, tmp_path):
         self.elasticsearch_io = ElasticsearchRemoteLogIO(
             write_to_es=True,
             write_stdout=True,
             delete_local_copy=True,
             host="http://localhost:9200";,
-            base_log_folder=Path(""),
+            base_log_folder=tmp_path,
         )
 
     @pytest.fixture
-    def tmp_json_file(self):
-        with tempfile.TemporaryDirectory() as tmpdir:
-            os.makedirs(tmpdir, exist_ok=True)
-
-            file_path = os.path.join(tmpdir, "1.log")
-            self.tmp_file = file_path
-
-            sample_logs = [
-                {"message": "start"},
-                {"message": "processing"},
-                {"message": "end"},
-            ]
-            with open(file_path, "w") as f:
-                for log in sample_logs:
-                    f.write(json.dumps(log) + "\n")
-
-            yield file_path
-
-            del self.tmp_file
-
-    @pytest.fixture
-    def ti(self, create_task_instance, create_log_template):
-        create_log_template(
-            self.FILENAME_TEMPLATE,
-            (
-                "{dag_id}-{task_id}-{logical_date}-{try_number}"
-                if AIRFLOW_V_3_0_PLUS
-                else "{dag_id}-{task_id}-{execution_date}-{try_number}"
-            ),
-        )
-        yield get_ti(
-            dag_id=self.DAG_ID,
-            task_id=self.TASK_ID,
-            run_id=self.RUN_ID,
-            logical_date=self.LOGICAL_DATE,
-            create_task_instance=create_task_instance,
-        )
-        clear_db_runs()
-        clear_db_dags()
+    def ti(self):
+        return _MockTI()
 
     @pytest.fixture
-    def unique_index(self):
-        """Generate a unique index name for each test."""
-        return f"airflow-logs-{uuid.uuid4()}"
+    def tmp_json_file(self, tmp_path):
+        file_path = tmp_path / "1.log"
+        sample_logs = [
+            {"message": "start"},
+            {"message": "processing"},
+            {"message": "end"},
+        ]
+        file_path.write_text("\n".join(json.dumps(log) for log in sample_logs) 
+ "\n")
+        return file_path
 
     def test_write_to_stdout(self, tmp_json_file, ti, capsys):
         self.elasticsearch_io.write_to_es = False
@@ -1030,9 +613,7 @@ class TestElasticsearchRemoteLogIO:
         captured = capsys.readouterr()
         stdout_lines = captured.out.strip().splitlines()
         log_entries = [json.loads(line) for line in stdout_lines]
-        assert log_entries[0]["message"] == "start"
-        assert log_entries[1]["message"] == "processing"
-        assert log_entries[2]["message"] == "end"
+        assert [entry["message"] for entry in log_entries] == ["start", 
"processing", "end"]
 
     def test_invalid_task_log_file_path(self, ti):
         with (
@@ -1041,93 +622,106 @@ class TestElasticsearchRemoteLogIO:
         ):
             self.elasticsearch_io.upload(Path("/invalid/path"), ti)
 
-            mock_parse.assert_not_called()
-            mock_write.assert_not_called()
+        mock_parse.assert_not_called()
+        mock_write.assert_not_called()
 
-    def test_raw_log_should_contain_log_id_and_offset(self, tmp_json_file, ti):
-        with open(self.tmp_file) as f:
-            raw_log = f.read()
-        json_log_lines = self.elasticsearch_io._parse_raw_log(raw_log, ti)
-        assert len(json_log_lines) == 3
-        for json_log_line in json_log_lines:
-            assert "log_id" in json_log_line
-            assert "offset" in json_log_line
-
-    @patch("elasticsearch.Elasticsearch.count", return_value={"count": 0})
-    def test_read_with_missing_log(self, mocked_count, ti):
-        log_source_info, log_messages = self.elasticsearch_io.read("", ti)
+    def test_raw_log_contains_log_id_and_offset(self, tmp_json_file, ti):
+        raw_log = tmp_json_file.read_text()
         log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti, 
ti.try_number)
-        assert log_source_info == []
-        assert f"*** Log {log_id} not found in Elasticsearch" in 
log_messages[0]
-        mocked_count.assert_called_once()
+        json_log_lines = self.elasticsearch_io._parse_raw_log(raw_log, log_id)
 
-    def test_read_error_detail(self, ti):
-        """Verify that error_detail is correctly retrieved and formatted."""
-        error_detail = [
+        assert len(json_log_lines) == 3
+        assert [line["offset"] for line in json_log_lines] == [1, 2, 3]
+        assert all(line["log_id"] == log_id for line in json_log_lines)
+
+    def test_es_read_builds_expected_query(self, ti):
+        self.elasticsearch_io.client = Mock()
+        self.elasticsearch_io.client.count.return_value = {"count": 1}
+        self.elasticsearch_io.client.search.return_value = 
_build_es_search_response(
             {
-                "is_cause": False,
-                "frames": [{"filename": "/opt/airflow/dags/fail.py", "lineno": 
13, "name": "log_and_raise"}],
-                "exc_type": "RuntimeError",
-                "exc_value": "Woopsie. Something went wrong.",
+                "event": "hello",
+                "log_id": 
_render_log_id(self.elasticsearch_io.log_id_template, ti, ti.try_number),
+                "offset": 3,
+            }
+        )
+        self.elasticsearch_io.index_patterns = "airflow-logs-*"
+        log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti, 
ti.try_number)
+        query = {
+            "bool": {
+                "filter": [{"range": {self.elasticsearch_io.offset_field: 
{"gt": 2}}}],
+                "must": [{"match_phrase": {"log_id": log_id}}],
             }
-        ]
-        body = {
-            "event": "Task failed with exception",
-            "log_id": _render_log_id(self.elasticsearch_io.log_id_template, 
ti, ti.try_number),
-            "offset": 1,
-            "error_detail": error_detail,
         }
 
-        from airflow.providers.elasticsearch.log.es_response import Hit
+        response = self.elasticsearch_io._es_read(log_id, 2, ti)
 
-        mock_hit = Hit({"_source": body})
-        with (
-            patch.object(self.elasticsearch_io, "_es_read") as mock_es_read,
-            patch.object(
-                self.elasticsearch_io,
-                "_group_logs_by_host",
-                return_value={"http://localhost:9200": [mock_hit]},
-            ),
-        ):
-            mock_es_read.return_value = mock.MagicMock()
-            mock_es_read.return_value.hits = [mock_hit]
+        
self.elasticsearch_io.client.count.assert_called_once_with(index="airflow-logs-*",
 query=query)
+        self.elasticsearch_io.client.search.assert_called_once_with(
+            index="airflow-logs-*",
+            query=query,
+            sort=[self.elasticsearch_io.offset_field],
+            size=self.elasticsearch_io.MAX_LINE_PER_PAGE,
+            from_=0,
+        )
+        assert response is not None
+        assert response.hits[0].event == "hello"
 
-            log_source_info, log_messages = self.elasticsearch_io.read("", ti)
+    def test_es_read_returns_none_when_count_is_zero(self, ti):
+        self.elasticsearch_io.client = Mock()
+        self.elasticsearch_io.client.count.return_value = {"count": 0}
 
-            assert len(log_messages) == 1
-            log_entry = json.loads(log_messages[0])
-            assert "error_detail" in log_entry
-            assert log_entry["error_detail"] == error_detail
+        log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti, 
ti.try_number)
+        response = self.elasticsearch_io._es_read(log_id, 0, ti)
 
+        assert response is None
+        self.elasticsearch_io.client.search.assert_not_called()
 
-# ---------------------------------------------------------------------------
-# Tests for the error_detail helpers (issue #63736)
-# ---------------------------------------------------------------------------
+    def test_es_read_propagates_missing_index(self, ti):
+        self.elasticsearch_io.client = Mock()
+        self.elasticsearch_io.client.count.side_effect = 
elasticsearch.exceptions.NotFoundError(
+            404,
+            "IndexMissingException[[missing] missing]",
+            {},
+        )
 
+        log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti, 
ti.try_number)
+        with pytest.raises(elasticsearch.exceptions.NotFoundError):
+            self.elasticsearch_io._es_read(log_id, 0, ti)
 
-class TestFormatErrorDetail:
-    """Unit tests for _format_error_detail."""
+    def test_es_read_logs_and_returns_none_on_search_error(self, ti):
+        self.elasticsearch_io.client = Mock()
+        self.elasticsearch_io.client.count.return_value = {"count": 1}
+        self.elasticsearch_io.client.search.side_effect = RuntimeError("boom")
 
-    def test_returns_none_for_empty(self):
-        from airflow.providers.elasticsearch.log.es_task_handler import 
_format_error_detail
+        log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti, 
ti.try_number)
+        with patch.object(self.elasticsearch_io.log, "exception") as 
mock_exception:
+            response = self.elasticsearch_io._es_read(log_id, 0, ti)
+
+        assert response is None
+        mock_exception.assert_called_once()
+
+    def test_read_returns_missing_log_message_when_es_read_returns_none(self, 
ti):
+        with patch.object(self.elasticsearch_io, "_es_read", 
return_value=None):
+            log_source_info, log_messages = self.elasticsearch_io.read("", ti)
 
+        log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti, 
ti.try_number)
+        assert log_source_info == []
+        assert f"*** Log {log_id} not found in Elasticsearch" in 
log_messages[0]
+
+
+class TestFormatErrorDetail:
+    def test_returns_none_for_empty(self):
         assert _format_error_detail(None) is None
         assert _format_error_detail([]) is None
 
     def test_returns_string_for_non_list(self):
-        from airflow.providers.elasticsearch.log.es_task_handler import 
_format_error_detail
-
         assert _format_error_detail("raw string") == "raw string"
 
     def test_formats_single_exception(self):
-        from airflow.providers.elasticsearch.log.es_task_handler import 
_format_error_detail
-
         error_detail = [
             {
                 "is_cause": False,
-                "frames": [
-                    {"filename": "/app/task.py", "lineno": 13, "name": 
"log_and_raise"},
-                ],
+                "frames": [{"filename": "/app/task.py", "lineno": 13, "name": 
"log_and_raise"}],
                 "exc_type": "RuntimeError",
                 "exc_value": "Something went wrong.",
                 "exceptions": [],
@@ -1141,8 +735,6 @@ class TestFormatErrorDetail:
         assert "RuntimeError: Something went wrong." in result
 
     def test_formats_chained_exceptions(self):
-        from airflow.providers.elasticsearch.log.es_task_handler import 
_format_error_detail
-
         error_detail = [
             {
                 "is_cause": True,
@@ -1166,8 +758,6 @@ class TestFormatErrorDetail:
         assert "RuntimeError: wrapped" in result
 
     def test_exc_type_without_value(self):
-        from airflow.providers.elasticsearch.log.es_task_handler import 
_format_error_detail
-
         error_detail = [
             {
                 "is_cause": False,
@@ -1181,19 +771,13 @@ class TestFormatErrorDetail:
         assert result.endswith("StopIteration")
 
     def test_non_dict_items_are_stringified(self):
-        from airflow.providers.elasticsearch.log.es_task_handler import 
_format_error_detail
-
         result = _format_error_detail(["unexpected string item"])
         assert result is not None
         assert "unexpected string item" in result
 
 
 class TestBuildStructuredLogFields:
-    """Unit tests for _build_log_fields."""
-
     def test_filters_to_allowed_fields(self):
-        from airflow.providers.elasticsearch.log.es_task_handler import 
_build_log_fields
-
         hit = {"event": "hello", "level": "info", "unknown_field": "should be 
dropped"}
         result = _build_log_fields(hit)
         assert "event" in result
@@ -1201,41 +785,30 @@ class TestBuildStructuredLogFields:
         assert "unknown_field" not in result
 
     def test_message_mapped_to_event(self):
-        from airflow.providers.elasticsearch.log.es_task_handler import 
_build_log_fields
-
         hit = {"message": "plain message", "timestamp": "2024-01-01T00:00:00Z"}
         fields = _build_log_fields(hit)
         assert fields["event"] == "plain message"
-        assert "message" not in fields  # Ensure it is popped if used as event
+        assert "message" not in fields
 
     def test_message_preserved_if_event_exists(self):
-        from airflow.providers.elasticsearch.log.es_task_handler import 
_build_log_fields
-
         hit = {"event": "structured event", "message": "plain message"}
         fields = _build_log_fields(hit)
         assert fields["event"] == "structured event"
-        # message is preserved if it's in TASK_LOG_FIELDS and doesn't collide 
with event
         assert fields["message"] == "plain message"
 
     def test_levelname_mapped_to_level(self):
-        from airflow.providers.elasticsearch.log.es_task_handler import 
_build_log_fields
-
         hit = {"event": "msg", "levelname": "ERROR"}
         result = _build_log_fields(hit)
         assert result["level"] == "ERROR"
         assert "levelname" not in result
 
     def test_at_timestamp_mapped_to_timestamp(self):
-        from airflow.providers.elasticsearch.log.es_task_handler import 
_build_log_fields
-
         hit = {"event": "msg", "@timestamp": "2024-01-01T00:00:00Z"}
         result = _build_log_fields(hit)
         assert result["timestamp"] == "2024-01-01T00:00:00Z"
         assert "@timestamp" not in result
 
     def test_error_detail_is_kept_as_list(self):
-        from airflow.providers.elasticsearch.log.es_task_handler import 
_build_log_fields
-
         error_detail = [
             {
                 "is_cause": False,
@@ -1244,75 +817,11 @@ class TestBuildStructuredLogFields:
                 "exc_value": "Woopsie.",
             }
         ]
-        hit = {
-            "event": "Task failed with exception",
-            "error_detail": error_detail,
-        }
+        hit = {"event": "Task failed with exception", "error_detail": 
error_detail}
         result = _build_log_fields(hit)
         assert result["error_detail"] == error_detail
 
     def test_error_detail_dropped_when_empty(self):
-        from airflow.providers.elasticsearch.log.es_task_handler import 
_build_log_fields
-
         hit = {"event": "msg", "error_detail": []}
         result = _build_log_fields(hit)
         assert "error_detail" not in result
-
-    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage 
only exists in Airflow 3+")
-    @elasticmock
-    def test_read_includes_error_detail_in_structured_message(self):
-        """End-to-end: a hit with error_detail should surface it in the 
returned StructuredLogMessage."""
-        from airflow.providers.elasticsearch.log.es_task_handler import 
ElasticsearchTaskHandler
-
-        local_log_location = "local/log/location"
-        handler = ElasticsearchTaskHandler(
-            base_log_folder=local_log_location,
-            end_of_log_mark="end_of_log\n",
-            write_stdout=False,
-            json_format=False,
-            json_fields="asctime,filename,lineno,levelname,message,exc_text",
-        )
-
-        es = elasticsearch.Elasticsearch("http://localhost:9200";)
-        log_id = "test_dag-test_task-test_run--1-1"
-        body = {
-            "event": "Task failed with exception",
-            "log_id": log_id,
-            "offset": 1,
-            "error_detail": [
-                {
-                    "is_cause": False,
-                    "frames": [
-                        {"filename": "/opt/airflow/dags/fail.py", "lineno": 
13, "name": "log_and_raise"}
-                    ],
-                    "exc_type": "RuntimeError",
-                    "exc_value": "Woopsie. Something went wrong.",
-                }
-            ],
-        }
-        es.index(index="test_index", doc_type="log", document=body, id=1)
-
-        # Patch the IO layer to return our fake document
-        mock_hit_dict = body.copy()
-
-        from airflow.providers.elasticsearch.log.es_response import 
ElasticSearchResponse, Hit
-
-        mock_hit = Hit({"_source": mock_hit_dict})
-        mock_response = mock.MagicMock(spec=ElasticSearchResponse)
-        mock_response.hits = [mock_hit]
-        mock_response.__iter__ = mock.Mock(return_value=iter([mock_hit]))
-        mock_response.__bool__ = mock.Mock(return_value=True)
-        mock_response.__getitem__ = mock.Mock(return_value=mock_hit)
-
-        with mock.patch.object(handler.io, "_es_read", 
return_value=mock_response):
-            with mock.patch.object(handler.io, "_group_logs_by_host", 
return_value={"localhost": [mock_hit]}):
-                # Build StructuredLogMessages
-                from airflow.providers.elasticsearch.log.es_task_handler 
import _build_log_fields
-                from airflow.utils.log.file_task_handler import 
StructuredLogMessage
-
-                fields = _build_log_fields(mock_hit.to_dict())
-                msg = StructuredLogMessage(**fields)
-
-                assert msg.event == "Task failed with exception"
-                assert hasattr(msg, "error_detail")
-                assert msg.error_detail == body["error_detail"]

Reply via email to