This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c47f51655c Add unittest for ElasticSearchJSONFormatter and
ElasticSearchResponse (#35697)
c47f51655c is described below
commit c47f51655c369e2045283e0a9c1d65c32f231f38
Author: Owen Leung <[email protected]>
AuthorDate: Fri Nov 17 22:49:56 2023 +0800
Add unittest for ElasticSearchJSONFormatter and ElasticSearchResponse
(#35697)
* Add unittest for ElasticSearchJSONFormatter and ElasticSearchResponse
* Remove es from OVERLOOKED_TESTS
---
tests/always/test_project_structure.py | 2 -
.../elasticsearch/log/test_es_json_formatter.py | 90 +++++++++
.../elasticsearch/log/test_es_response.py | 211 +++++++++++++++++++++
3 files changed, 301 insertions(+), 2 deletions(-)
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index 74b9ada848..a58f33d755 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -109,8 +109,6 @@ class TestProjectStructure:
"tests/providers/daskexecutor/executors/test_dask_executor.py",
"tests/providers/databricks/hooks/test_databricks_base.py",
"tests/providers/docker/test_exceptions.py",
- "tests/providers/elasticsearch/log/test_es_json_formatter.py",
- "tests/providers/elasticsearch/log/test_es_response.py",
"tests/providers/google/cloud/fs/test_gcs.py",
"tests/providers/google/cloud/links/test_automl.py",
"tests/providers/google/cloud/links/test_base.py",
diff --git a/tests/providers/elasticsearch/log/test_es_json_formatter.py
b/tests/providers/elasticsearch/log/test_es_json_formatter.py
new file mode 100644
index 0000000000..4e43c683a1
--- /dev/null
+++ b/tests/providers/elasticsearch/log/test_es_json_formatter.py
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+import logging
+
+import pendulum
+import pytest
+
+from airflow.providers.elasticsearch.log.es_task_handler import (
+ ElasticsearchJSONFormatter,
+)
+
+
+class TestElasticsearchJSONFormatter:
+ JSON_FIELDS = ["asctime", "filename", "lineno", "levelname", "message",
"exc_text"]
+ EXTRA_FIELDS = {
+ "dag_id": "dag1",
+ "task_id": "task1",
+ "execution_date": "2023-11-17",
+ "try_number": "1",
+ "log_id": "Some_log_id",
+ }
+
+ @pytest.fixture()
+ def es_json_formatter(self):
+ return ElasticsearchJSONFormatter()
+
+ @pytest.fixture()
+ def log_record(self):
+ return logging.LogRecord(
+ name="test",
+ level=logging.INFO,
+ pathname="test_file.txt",
+ lineno=1,
+ msg="Test message",
+ args=(),
+ exc_info=None,
+ )
+
+ def test_format_log_record(self, es_json_formatter, log_record):
+ """Test the log record formatting."""
+ es_json_formatter.json_fields = self.JSON_FIELDS
+ formatted = es_json_formatter.format(log_record)
+ data = json.loads(formatted)
+ assert all(key in self.JSON_FIELDS for key in data.keys())
+ assert data["filename"] == "test_file.txt"
+ assert data["lineno"] == 1
+ assert data["levelname"] == "INFO"
+ assert data["message"] == "Test message"
+
+ def test_formattime_in_iso8601_format(self, es_json_formatter, log_record):
+ es_json_formatter.json_fields = ["asctime"]
+ iso8601_format = es_json_formatter.formatTime(log_record)
+ try:
+ pendulum.parse(iso8601_format, strict=True)
+ except ValueError:
+ raise Exception("Time is not in ISO8601 format")
+
+ def test_extra_fields(self, es_json_formatter, log_record):
+ es_json_formatter.json_fields = self.JSON_FIELDS
+ es_json_formatter.extras = self.EXTRA_FIELDS
+ formatted = es_json_formatter.format(log_record)
+ data = json.loads(formatted)
+ assert all((key in self.JSON_FIELDS or key in self.EXTRA_FIELDS) for
key in data.keys())
+ assert data["filename"] == "test_file.txt"
+ assert data["lineno"] == 1
+ assert data["levelname"] == "INFO"
+ assert data["dag_id"] == "dag1"
+ assert data["task_id"] == "task1"
+ assert data["execution_date"] == "2023-11-17"
+ assert data["try_number"] == "1"
+ assert data["log_id"] == "Some_log_id"
diff --git a/tests/providers/elasticsearch/log/test_es_response.py
b/tests/providers/elasticsearch/log/test_es_response.py
new file mode 100644
index 0000000000..30c41f8d92
--- /dev/null
+++ b/tests/providers/elasticsearch/log/test_es_response.py
@@ -0,0 +1,211 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Any
+
+import pytest
+
+from airflow.providers.elasticsearch.log.es_response import (
+ AttributeDict,
+ AttributeList,
+ ElasticSearchResponse,
+ Hit,
+ HitMeta,
+ _wrap,
+)
+from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchTaskHandler
+
+
+class TestWrap:
+ def test_wrap_with_dict(self):
+ test_dict = {"key1": "value1"}
+ result = _wrap(test_dict)
+ assert isinstance(result, AttributeDict)
+ assert result.key1 == "value1"
+
+ def test_wrap_with_non_dict(self):
+ test_values = [1, [2, 3], "string", 4.5]
+ for value in test_values:
+ assert _wrap(value) == value
+
+
+class TestAttributeList:
+ def test_initialization(self):
+ test_list = [1, 2, 3]
+ attr_list = AttributeList(test_list)
+ assert attr_list._l_ == test_list
+
+ test_tuple = (1, 2, 3)
+ attr_list = AttributeList(test_tuple)
+ assert attr_list._l_ == list(test_tuple)
+
+ def test_index_access(self):
+ test_list = [1, {"key1": "value1"}, 3]
+ attr_list = AttributeList(test_list)
+
+ assert attr_list[0] == 1
+ assert isinstance(attr_list[1], AttributeDict)
+ assert attr_list[1].key1 == "value1"
+ assert attr_list[2] == 3
+
+ def test_iteration(self):
+ test_list = [1, {"key": "value"}, 3]
+ attr_list = AttributeList(test_list)
+
+ for i, item in enumerate(attr_list):
+ if isinstance(test_list[i], dict):
+ assert isinstance(item, AttributeDict)
+ else:
+ assert item == test_list[i]
+
+ def test_boolean_representation(self):
+ assert AttributeList([1, 2, 3])
+ assert not (AttributeList([]))
+
+
+class TestAttributeDict:
+ def test_initialization(self):
+ test_dict = {"key1": "value1", "key2": "value2"}
+ attr_dict = AttributeDict(test_dict)
+ assert attr_dict._d_ == test_dict
+
+ def test_attribute_access(self):
+ test_dict = {"key1": "value1", "key2": {"subkey1": "subvalue1"}}
+ attr_dict = AttributeDict(test_dict)
+
+ assert attr_dict.key1 == "value1"
+ assert isinstance(attr_dict.key2, AttributeDict)
+ assert attr_dict.key2.subkey1 == "subvalue1"
+
+ def test_item_access(self):
+ test_dict = {"key1": "value1", "key2": "value2"}
+ attr_dict = AttributeDict(test_dict)
+
+ assert attr_dict["key1"] == "value1"
+ assert attr_dict["key2"] == "value2"
+
+ def test_nonexistent_key(self):
+ test_dict = {"key1": "value1"}
+ attr_dict = AttributeDict(test_dict)
+
+ with pytest.raises(AttributeError):
+ _ = attr_dict.nonexistent_key
+
+ def test_to_dict(self):
+ test_dict = {"key1": "value1", "key2": "value2"}
+ attr_dict = AttributeDict(test_dict)
+ assert attr_dict.to_dict() == test_dict
+
+
+class TestHitAndHitMetaAndElasticSearchResponse:
+ ES_DOCUMENT: dict[str, Any] = {
+ "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total": 7},
+ "hits": {
+ "hits": [
+ {
+ "_id": "jdeZT4kBjAZqZnexVUxk",
+ "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
+ "_score": 2.482621,
+ "_source": {
+ "@timestamp": "2023-07-13T14:13:15.140Z",
+ "asctime": "2023-07-09T07:47:43.907+0000",
+ "container": {"id": "airflow"},
+ "dag_id": "example_bash_operator",
+ "ecs": {"version": "8.0.0"},
+ "execution_date": "2023_07_09T07_47_32_000000",
+ "filename": "taskinstance.py",
+ "input": {"type": "log"},
+ "levelname": "INFO",
+ "lineno": 1144,
+ "log": {
+ "file": {
+ "path":
"/opt/airflow/Documents/GitHub/airflow/logs/"
+ "dag_id=example_bash_operator'"
+
"/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log"
+ },
+ "offset": 0,
+ },
+ "log.offset": 1688888863907337472,
+ "log_id":
"example_bash_operator-run_after_loop-owen_run_run--1-1",
+ "message": "Dependencies all met for "
+ "dep_context=non-requeueable deps "
+ "ti=<TaskInstance: "
+ "example_bash_operator.run_after_loop "
+ "owen_run_run [queued]>",
+ "task_id": "run_after_loop",
+ "try_number": "1",
+ },
+ "_type": "_doc",
+ }
+ ]
+ },
+ }
+ HIT_DOCUMENT = ES_DOCUMENT["hits"]["hits"][0]
+
+ def test_hit_initialization_and_to_dict(self):
+ hit = Hit(self.HIT_DOCUMENT)
+
+ assert hit.asctime == "2023-07-09T07:47:43.907+0000"
+ assert hit.dag_id == "example_bash_operator"
+ assert hit.lineno == 1144
+ assert (
+ hit.log.file.path
+ ==
"/opt/airflow/Documents/GitHub/airflow/logs/dag_id=example_bash_operator'/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log"
+ )
+
+ # Test meta attribute
+ assert isinstance(hit.meta, HitMeta)
+ assert hit.to_dict() == self.HIT_DOCUMENT["_source"]
+
+ def test_hitmeta_initialization_and_to_dict(self):
+ hitmeta = HitMeta(self.HIT_DOCUMENT)
+
+ assert hitmeta.id == "jdeZT4kBjAZqZnexVUxk"
+ assert hitmeta.index == ".ds-filebeat-8.8.2-2023.07.09-000001"
+ assert hitmeta.score == 2.482621
+ assert hitmeta.doc_type == "_doc"
+
+ expected_dict = {
+ k[1:] if k.startswith("_") else k: v for (k, v) in
self.HIT_DOCUMENT.items() if k != "_source"
+ }
+ expected_dict["doc_type"] = expected_dict.pop("type")
+ assert hitmeta.to_dict() == expected_dict
+
+ def test_elasticsearchresponse_initialization_and_hits_and_bool(self):
+ task_handler = ElasticsearchTaskHandler(
+ base_log_folder="local/log/location",
+ end_of_log_mark="end_of_log\n",
+ write_stdout=False,
+ json_format=False,
+ json_fields="asctime,filename,lineno,levelname,message,exc_text",
+ )
+ response = ElasticSearchResponse(task_handler, self.ES_DOCUMENT)
+
+ assert response._d_ == self.ES_DOCUMENT
+ assert isinstance(response.hits, AttributeList)
+
+ for hit in response.hits:
+ assert isinstance(hit, Hit)
+ assert isinstance(hit.meta, HitMeta)
+
+ assert response.hits[0].asctime == "2023-07-09T07:47:43.907+0000"
+ assert response.hits[0].levelname == "INFO"
+
+ assert bool(response) is True