This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new eca077b3b9 Get served logs when remote or executor logs not available
for non-running task try (#39177)
eca077b3b9 is described below
commit eca077b3b994813a09942497704ec61e35efd7d5
Author: Kalle Ahlström <[email protected]>
AuthorDate: Thu Apr 25 16:19:37 2024 +0300
Get served logs when remote or executor logs not available for non-running
task try (#39177)
---
airflow/utils/log/file_task_handler.py | 12 ++++++------
tests/utils/test_log_handlers.py | 14 ++++++++++++--
2 files changed, 18 insertions(+), 8 deletions(-)
diff --git a/airflow/utils/log/file_task_handler.py
b/airflow/utils/log/file_task_handler.py
index 72b8deedbb..2a1dfd25f6 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -366,10 +366,7 @@ class FileTaskHandler(logging.Handler):
executor_messages: list[str] = []
executor_logs: list[str] = []
served_logs: list[str] = []
- is_running = ti.try_number == try_number and ti.state in (
- TaskInstanceState.RUNNING,
- TaskInstanceState.DEFERRED,
- )
+ is_in_running_or_deferred = ti.state in (TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED)
with suppress(NotImplementedError):
remote_messages, remote_logs = self._read_remote_logs(ti,
try_number, metadata)
messages_list.extend(remote_messages)
@@ -384,7 +381,9 @@ class FileTaskHandler(logging.Handler):
worker_log_full_path = Path(self.local_base, worker_log_rel_path)
local_messages, local_logs =
self._read_from_local(worker_log_full_path)
messages_list.extend(local_messages)
- if is_running and not executor_messages:
+ if is_in_running_or_deferred and not executor_messages and not
remote_logs:
+ # While task instance is still running and we don't have either
executor nor remote logs, look for served logs
+ # This is for cases when users have not setup remote logging nor
shared drive for logs
served_messages, served_logs = self._read_from_logs_server(ti,
worker_log_rel_path)
messages_list.extend(served_messages)
elif ti.state not in State.unfinished and not (local_logs or
remote_logs):
@@ -404,11 +403,12 @@ class FileTaskHandler(logging.Handler):
)
log_pos = len(logs)
messages = "".join([f"*** {x}\n" for x in messages_list])
+ end_of_log = ti.try_number != try_number or not
is_in_running_or_deferred
if metadata and "log_pos" in metadata:
previous_chars = metadata["log_pos"]
logs = logs[previous_chars:] # Cut off previously passed log test
as new tail
out_message = logs if "log_pos" in (metadata or {}) else messages +
logs
- return out_message, {"end_of_log": not is_running, "log_pos": log_pos}
+ return out_message, {"end_of_log": end_of_log, "log_pos": log_pos}
@staticmethod
def _get_pod_namespace(ti: TaskInstance):
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index ba849825d5..2bfc574b64 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -315,7 +315,7 @@ class TestFileTaskLogHandler:
def test__read_for_celery_executor_fallbacks_to_worker(self,
create_task_instance):
"""Test for executors which do not have `get_task_log` method, it
fallbacks to reading
- log from worker. But it happens only for the latest try_number."""
+ log from worker if and only if remote logs aren't found"""
executor_name = "CeleryExecutor"
ti = create_task_instance(
@@ -336,7 +336,17 @@ class TestFileTaskLogHandler:
fth._read_from_logs_server.assert_called_once()
assert actual == ("*** this message\nthis\nlog\ncontent",
{"end_of_log": False, "log_pos": 16})
- # Previous try_number is from remote logs without reaching worker
server
+ # Previous try_number should return served logs when remote logs
aren't implemented
+ fth._read_from_logs_server = mock.Mock()
+ fth._read_from_logs_server.return_value = ["served logs
try_number=1"], ["this\nlog\ncontent"]
+ actual = fth._read(ti=ti, try_number=1)
+ fth._read_from_logs_server.assert_called_once()
+ assert actual == (
+ "*** served logs try_number=1\nthis\nlog\ncontent",
+ {"end_of_log": True, "log_pos": 16},
+ )
+
+ # When remote_logs is implemented, previous try_number is from
remote logs without reaching worker server
fth._read_from_logs_server.reset_mock()
fth._read_remote_logs = mock.Mock()
fth._read_remote_logs.return_value = ["remote logs"],
["remote\nlog\ncontent"]