This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-9-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1b26fb78829066ece68eab75577e85547de34a31
Author: Kalle Ahlström <[email protected]>
AuthorDate: Tue May 21 19:24:15 2024 +0300

    Fetch served logs also when task attempt is up for retry and no remote logs 
available (#39496)
    
    (cherry picked from commit ec2e245f0e894e8c9d4135480fe04f8109d678ee)
---
 airflow/utils/log/file_task_handler.py |  8 ++++++--
 tests/utils/test_log_handlers.py       | 24 +++++++++++++++++-------
 2 files changed, 23 insertions(+), 9 deletions(-)

diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index 6d35b230b4..ce76251681 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -343,7 +343,11 @@ class FileTaskHandler(logging.Handler):
         executor_messages: list[str] = []
         executor_logs: list[str] = []
         served_logs: list[str] = []
-        is_in_running_or_deferred = ti.state in (TaskInstanceState.RUNNING, 
TaskInstanceState.DEFERRED)
+        is_in_running_or_deferred = ti.state in (
+            TaskInstanceState.RUNNING,
+            TaskInstanceState.DEFERRED,
+        )
+        is_up_for_retry = ti.state == TaskInstanceState.UP_FOR_RETRY
         with suppress(NotImplementedError):
             remote_messages, remote_logs = self._read_remote_logs(ti, 
try_number, metadata)
             messages_list.extend(remote_messages)
@@ -358,7 +362,7 @@ class FileTaskHandler(logging.Handler):
             worker_log_full_path = Path(self.local_base, worker_log_rel_path)
             local_messages, local_logs = 
self._read_from_local(worker_log_full_path)
             messages_list.extend(local_messages)
-        if is_in_running_or_deferred and not executor_messages and not 
remote_logs:
+        if (is_in_running_or_deferred or is_up_for_retry) and not 
executor_messages and not remote_logs:
             # While task instance is still running and we don't have either 
executor nor remote logs, look for served logs
             # This is for cases when users have not setup remote logging nor 
shared drive for logs
             served_messages, served_logs = self._read_from_logs_server(ti, 
worker_log_rel_path)
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 13f1e4de0e..30647ada30 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -302,28 +302,35 @@ class TestFileTaskLogHandler:
         else:
             mock_k8s_get_task_log.assert_not_called()
 
-    def test__read_for_celery_executor_fallbacks_to_worker(self, 
create_task_instance):
+    @pytest.mark.parametrize(
+        "state", [TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED, 
TaskInstanceState.UP_FOR_RETRY]
+    )
+    def test__read_for_celery_executor_fallbacks_to_worker(self, state, 
create_task_instance):
         """Test for executors which do not have `get_task_log` method, it 
fallbacks to reading
         log from worker if and only if remote logs aren't found"""
         executor_name = "CeleryExecutor"
-
+        # Reading logs from worker should occur when the task is either 
running, deferred, or up for retry.
         ti = create_task_instance(
-            dag_id="dag_for_testing_celery_executor_log_read",
+            dag_id=f"dag_for_testing_celery_executor_log_read_{state}",
             task_id="task_for_testing_celery_executor_log_read",
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
         )
-        ti.state = TaskInstanceState.RUNNING
         ti.try_number = 2
+        ti.state = state
         with conf_vars({("core", "executor"): executor_name}):
             reload(executor_loader)
             fth = FileTaskHandler("")
-
             fth._read_from_logs_server = mock.Mock()
             fth._read_from_logs_server.return_value = ["this message"], 
["this\nlog\ncontent"]
             actual = fth._read(ti=ti, try_number=2)
             fth._read_from_logs_server.assert_called_once()
-            assert actual == ("*** this message\nthis\nlog\ncontent", 
{"end_of_log": False, "log_pos": 16})
+            # If we are in the up for retry state, the log has ended.
+            expected_end_of_log = state in (TaskInstanceState.UP_FOR_RETRY)
+            assert actual == (
+                "*** this message\nthis\nlog\ncontent",
+                {"end_of_log": expected_end_of_log, "log_pos": 16},
+            )
 
             # Previous try_number should return served logs when remote logs 
aren't implemented
             fth._read_from_logs_server = mock.Mock()
@@ -342,7 +349,10 @@ class TestFileTaskLogHandler:
             actual = fth._read(ti=ti, try_number=1)
             fth._read_remote_logs.assert_called_once()
             fth._read_from_logs_server.assert_not_called()
-            assert actual == ("*** remote logs\nremote\nlog\ncontent", 
{"end_of_log": True, "log_pos": 18})
+            assert actual == (
+                "*** remote logs\nremote\nlog\ncontent",
+                {"end_of_log": True, "log_pos": 18},
+            )
 
     @pytest.mark.parametrize(
         "remote_logs, local_logs, served_logs_checked",

Reply via email to