This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new b2164de8ceb Log message source details are grouped (#43681) (#44070)
b2164de8ceb is described below
commit b2164de8ceb26f3588280d9de6ea394ec3ecb2d1
Author: Jens Scheffler <[email protected]>
AuthorDate: Sun Nov 17 15:00:05 2024 +0100
Log message source details are grouped (#43681) (#44070)
* Log message source details are grouped (#43681)
* Log message source details are grouped
* fix static checks
* fix pytests
* Another pytest fix
---------
Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <[email protected]>
(cherry picked from commit 9d1877261228a721111eba9945db3b870c9d87fe)
* Fix pytest
---------
Co-authored-by: majorosdonat <[email protected]>
---
airflow/utils/log/file_task_handler.py | 6 ++-
tests/api_connexion/endpoints/test_log_endpoint.py | 23 +++++-----
.../amazon/aws/log/test_s3_task_handler.py | 6 +--
.../google/cloud/log/test_gcs_task_handler.py | 7 +--
.../microsoft/azure/log/test_wasb_task_handler.py | 17 +++-----
tests/utils/log/test_log_reader.py | 51 ++++++++--------------
tests/utils/test_log_handlers.py | 14 ++++--
7 files changed, 55 insertions(+), 69 deletions(-)
diff --git a/airflow/utils/log/file_task_handler.py
b/airflow/utils/log/file_task_handler.py
index e99ffae0c94..9eb55c707f1 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -416,7 +416,11 @@ class FileTaskHandler(logging.Handler):
)
)
log_pos = len(logs)
- messages = "".join([f"*** {x}\n" for x in messages_list])
+ # Log message source details are grouped: they are not relevant for
most users and can
+ # distract them from finding the root cause of their errors
+ messages = " INFO - ::group::Log message source details\n"
+ messages += "".join([f"*** {x}\n" for x in messages_list])
+ messages += " INFO - ::endgroup::\n"
end_of_log = ti.try_number != try_number or ti.state not in (
TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED,
diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py
b/tests/api_connexion/endpoints/test_log_endpoint.py
index 93ad2cec4b0..b0f265ec858 100644
--- a/tests/api_connexion/endpoints/test_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_log_endpoint.py
@@ -188,10 +188,10 @@ class TestGetLog:
)
expected_filename =
f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log"
log_content = "Log for testing." if try_number == 1 else "Log for
testing 2."
- assert (
- response.json["content"]
- == f"[('localhost', '*** Found local files:\\n*** *
{expected_filename}\\n{log_content}')]"
- )
+ assert "[('localhost'," in response.json["content"]
+ assert f"*** Found local files:\\n*** * {expected_filename}\\n" in
response.json["content"]
+ assert f"{log_content}')]" in response.json["content"]
+
info = serializer.loads(response.json["continuation_token"])
assert info == {"end_of_log": True, "log_pos": 16 if try_number == 1
else 18}
assert 200 == response.status_code
@@ -244,11 +244,9 @@ class TestGetLog:
assert 200 == response.status_code
log_content = "Log for testing." if try_number == 1 else "Log for
testing 2."
-
- assert (
- response.data.decode("utf-8")
- == f"localhost\n*** Found local files:\n*** *
{expected_filename}\n{log_content}\n"
- )
+ assert "localhost\n" in response.data.decode("utf-8")
+ assert f"*** Found local files:\n*** * {expected_filename}\n" in
response.data.decode("utf-8")
+ assert f"{log_content}\n" in response.data.decode("utf-8")
@pytest.mark.parametrize(
"request_url, expected_filename, extra_query_string, try_number",
@@ -302,10 +300,9 @@ class TestGetLog:
assert 200 == response.status_code
log_content = "Log for testing." if try_number == 1 else "Log for
testing 2."
- assert (
- response.data.decode("utf-8")
- == f"localhost\n*** Found local files:\n*** *
{expected_filename}\n{log_content}\n"
- )
+ assert "localhost\n" in response.data.decode("utf-8")
+ assert f"*** Found local files:\n*** * {expected_filename}\n" in
response.data.decode("utf-8")
+ assert f"{log_content}\n" in response.data.decode("utf-8")
@pytest.mark.parametrize("try_number", [1, 2])
def test_get_logs_response_with_ti_equal_to_none(self, try_number):
diff --git a/tests/providers/amazon/aws/log/test_s3_task_handler.py
b/tests/providers/amazon/aws/log/test_s3_task_handler.py
index 3412011eb4e..7b799a56289 100644
--- a/tests/providers/amazon/aws/log/test_s3_task_handler.py
+++ b/tests/providers/amazon/aws/log/test_s3_task_handler.py
@@ -127,8 +127,8 @@ class TestS3TaskHandler:
ti.state = TaskInstanceState.SUCCESS
log, metadata = self.s3_task_handler.read(ti)
actual = log[0][0][-1]
- expected = "*** Found logs in s3:\n*** *
s3://bucket/remote/log/location/1.log\nLog line"
- assert actual == expected
+ assert "*** Found logs in s3:\n*** *
s3://bucket/remote/log/location/1.log\n" in actual
+ assert actual.endswith("Log line")
assert metadata == [{"end_of_log": True, "log_pos": 8}]
def test_read_when_s3_log_missing(self):
@@ -140,7 +140,7 @@ class TestS3TaskHandler:
assert len(log) == len(metadata)
actual = log[0][0][-1]
expected = "*** No logs found on s3 for ti=<TaskInstance:
dag_for_testing_s3_task_handler.task_for_testing_s3_log_handler test
[success]>\n"
- assert actual == expected
+ assert expected in actual
assert {"end_of_log": True, "log_pos": 0} == metadata[0]
def test_s3_read_when_log_missing(self):
diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py
b/tests/providers/google/cloud/log/test_gcs_task_handler.py
index a860e52e152..2c961fac4c3 100644
--- a/tests/providers/google/cloud/log/test_gcs_task_handler.py
+++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py
@@ -106,7 +106,8 @@ class TestGCSTaskHandler:
mock_blob.from_string.assert_called_once_with(
"gs://bucket/remote/log/location/1.log", mock_client.return_value
)
- assert logs == "*** Found remote logs:\n*** *
gs://bucket/remote/log/location/1.log\nCONTENT"
+ assert "*** Found remote logs:\n*** *
gs://bucket/remote/log/location/1.log\n" in logs
+ assert logs.endswith("CONTENT")
assert {"end_of_log": True, "log_pos": 7} == metadata
@mock.patch(
@@ -126,13 +127,13 @@ class TestGCSTaskHandler:
ti.state = TaskInstanceState.SUCCESS
log, metadata = self.gcs_task_handler._read(ti, self.ti.try_number)
- assert log == (
+ assert (
"*** Found remote logs:\n"
"*** * gs://bucket/remote/log/location/1.log\n"
"*** Unable to read remote log Failed to connect\n"
"*** Found local files:\n"
f"*** * {self.gcs_task_handler.local_base}/1.log\n"
- )
+ ) in log
assert metadata == {"end_of_log": True, "log_pos": 0}
mock_blob.from_string.assert_called_once_with(
"gs://bucket/remote/log/location/1.log", mock_client.return_value
diff --git a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
index e74efe89e91..6ef1b99fd51 100644
--- a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
+++ b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
@@ -111,18 +111,13 @@ class TestWasbTaskHandler:
assert self.wasb_task_handler.wasb_read(self.remote_log_location) ==
"Log line"
ti = copy.copy(ti)
ti.state = TaskInstanceState.SUCCESS
- assert self.wasb_task_handler.read(ti) == (
- [
- [
- (
- "localhost",
- "*** Found remote logs:\n"
- "*** *
https://wasb-container.blob.core.windows.net/abc/hello.log\nLog line",
- )
- ]
- ],
- [{"end_of_log": True, "log_pos": 8}],
+ assert self.wasb_task_handler.read(ti)[0][0][0][0] == "localhost"
+ assert (
+ "*** Found remote logs:\n*** *
https://wasb-container.blob.core.windows.net/abc/hello.log\n"
+ in self.wasb_task_handler.read(ti)[0][0][0][1]
)
+ assert "Log line" in self.wasb_task_handler.read(ti)[0][0][0][1]
+ assert self.wasb_task_handler.read(ti)[1][0] == {"end_of_log": True,
"log_pos": 8}
@mock.patch(
"airflow.providers.microsoft.azure.hooks.wasb.WasbHook",
diff --git a/tests/utils/log/test_log_reader.py
b/tests/utils/log/test_log_reader.py
index 3216222909b..d4417bfba82 100644
--- a/tests/utils/log/test_log_reader.py
+++ b/tests/utils/log/test_log_reader.py
@@ -128,8 +128,10 @@ class TestLogView:
assert logs[0] == [
(
"localhost",
+ " INFO - ::group::Log message source details\n"
"*** Found local files:\n"
f"*** *
{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
+ " INFO - ::endgroup::\n"
"try_number=1.",
)
]
@@ -141,32 +143,13 @@ class TestLogView:
ti.state = TaskInstanceState.SUCCESS
logs, metadatas = task_log_reader.read_log_chunks(ti=ti,
try_number=None, metadata={})
- assert logs == [
- [
- (
- "localhost",
- "*** Found local files:\n"
- f"*** *
{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
- "try_number=1.",
- )
- ],
- [
- (
- "localhost",
- "*** Found local files:\n"
- f"*** *
{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/2.log\n"
- f"try_number=2.",
- )
- ],
- [
- (
- "localhost",
- "*** Found local files:\n"
- f"*** *
{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log\n"
- f"try_number=3.",
- )
- ],
- ]
+ for i in range(0, 3):
+ assert logs[i][0][0] == "localhost"
+ assert (
+ "*** Found local files:\n"
+ f"*** *
{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/{i +
1}.log\n"
+ ) in logs[i][0][1]
+ assert f"try_number={i + 1}." in logs[i][0][1]
assert metadatas == {"end_of_log": True, "log_pos": 13}
def test_test_test_read_log_stream_should_read_one_try(self):
@@ -175,9 +158,9 @@ class TestLogView:
ti.state = TaskInstanceState.SUCCESS
stream = task_log_reader.read_log_stream(ti=ti, try_number=1,
metadata={})
assert list(stream) == [
- "localhost\n*** Found local files:\n"
+ "localhost\n INFO - ::group::Log message source details\n*** Found
local files:\n"
f"*** *
{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
- "try_number=1.\n"
+ " INFO - ::endgroup::\ntry_number=1.\n"
]
def test_test_test_read_log_stream_should_read_all_logs(self):
@@ -185,17 +168,17 @@ class TestLogView:
self.ti.state = TaskInstanceState.SUCCESS # Ensure mocked instance is
completed to return stream
stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None,
metadata={})
assert list(stream) == [
- "localhost\n*** Found local files:\n"
+ "localhost\n INFO - ::group::Log message source details\n*** Found
local files:\n"
f"*** *
{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
- "try_number=1."
+ " INFO - ::endgroup::\ntry_number=1."
"\n",
- "localhost\n*** Found local files:\n"
+ "localhost\n INFO - ::group::Log message source details\n*** Found
local files:\n"
f"*** *
{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/2.log\n"
- "try_number=2."
+ " INFO - ::endgroup::\ntry_number=2."
"\n",
- "localhost\n*** Found local files:\n"
+ "localhost\n INFO - ::group::Log message source details\n*** Found
local files:\n"
f"*** *
{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log\n"
- "try_number=3."
+ " INFO - ::endgroup::\ntry_number=3."
"\n",
]
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index da10034cb93..d3651370d65 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -272,7 +272,9 @@ class TestFileTaskLogHandler:
fth = FileTaskHandler("")
actual = fth._read(ti=local_log_file_read, try_number=1)
mock_read_local.assert_called_with(path)
- assert actual == ("*** the messages\nthe log", {"end_of_log": True,
"log_pos": 7})
+ assert "*** the messages\n" in actual[0]
+ assert actual[0].endswith("the log")
+ assert actual[1] == {"end_of_log": True, "log_pos": 7}
def test__read_from_local(self, tmp_path):
"""Tests the behavior of method _read_from_local"""
@@ -333,9 +335,11 @@ class TestFileTaskLogHandler:
fth._read_from_logs_server = mock.Mock()
fth._read_from_logs_server.return_value = ["this message"],
["this\nlog\ncontent"]
- actual = fth._read(ti=ti, try_number=1)
+ actual_text, actual_meta = fth._read(ti=ti, try_number=1)
fth._read_from_logs_server.assert_called_once()
- assert actual == ("*** this message\nthis\nlog\ncontent",
{"end_of_log": True, "log_pos": 16})
+ assert "*** this message" in actual_text
+ assert "this\nlog\ncontent" in actual_text
+ assert actual_meta == {"end_of_log": True, "log_pos": 16}
@pytest.mark.parametrize(
"remote_logs, local_logs, served_logs_checked",
@@ -379,7 +383,9 @@ class TestFileTaskLogHandler:
actual = fth._read(ti=ti, try_number=1)
if served_logs_checked:
fth._read_from_logs_server.assert_called_once()
- assert actual == ("*** this message\nthis\nlog\ncontent",
{"end_of_log": True, "log_pos": 16})
+ assert "*** this message\n" in actual[0]
+ assert actual[0].endswith("this\nlog\ncontent")
+ assert actual[1] == {"end_of_log": True, "log_pos": 16}
else:
fth._read_from_logs_server.assert_not_called()
assert actual[0]