This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-9-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1b26fb78829066ece68eab75577e85547de34a31 Author: Kalle Ahlström <[email protected]> AuthorDate: Tue May 21 19:24:15 2024 +0300 Fetch served logs also when task attempt is up for retry and no remote logs available (#39496) (cherry picked from commit ec2e245f0e894e8c9d4135480fe04f8109d678ee) --- airflow/utils/log/file_task_handler.py | 8 ++++++-- tests/utils/test_log_handlers.py | 24 +++++++++++++++++------- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 6d35b230b4..ce76251681 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -343,7 +343,11 @@ class FileTaskHandler(logging.Handler): executor_messages: list[str] = [] executor_logs: list[str] = [] served_logs: list[str] = [] - is_in_running_or_deferred = ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) + is_in_running_or_deferred = ti.state in ( + TaskInstanceState.RUNNING, + TaskInstanceState.DEFERRED, + ) + is_up_for_retry = ti.state == TaskInstanceState.UP_FOR_RETRY with suppress(NotImplementedError): remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata) messages_list.extend(remote_messages) @@ -358,7 +362,7 @@ class FileTaskHandler(logging.Handler): worker_log_full_path = Path(self.local_base, worker_log_rel_path) local_messages, local_logs = self._read_from_local(worker_log_full_path) messages_list.extend(local_messages) - if is_in_running_or_deferred and not executor_messages and not remote_logs: + if (is_in_running_or_deferred or is_up_for_retry) and not executor_messages and not remote_logs: # While task instance is still running and we don't have either executor nor remote logs, look for served logs # This is for cases when users have not setup remote logging nor shared drive for logs served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 13f1e4de0e..30647ada30 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -302,28 +302,35 @@ class TestFileTaskLogHandler: else: mock_k8s_get_task_log.assert_not_called() - def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instance): + @pytest.mark.parametrize( + "state", [TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED, TaskInstanceState.UP_FOR_RETRY] + ) + def test__read_for_celery_executor_fallbacks_to_worker(self, state, create_task_instance): """Test for executors which do not have `get_task_log` method, it fallbacks to reading log from worker if and only if remote logs aren't found""" executor_name = "CeleryExecutor" - + # Reading logs from worker should occur when the task is either running, deferred, or up for retry. ti = create_task_instance( - dag_id="dag_for_testing_celery_executor_log_read", + dag_id=f"dag_for_testing_celery_executor_log_read_{state}", task_id="task_for_testing_celery_executor_log_read", run_type=DagRunType.SCHEDULED, execution_date=DEFAULT_DATE, ) - ti.state = TaskInstanceState.RUNNING ti.try_number = 2 + ti.state = state with conf_vars({("core", "executor"): executor_name}): reload(executor_loader) fth = FileTaskHandler("") - fth._read_from_logs_server = mock.Mock() fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"] actual = fth._read(ti=ti, try_number=2) fth._read_from_logs_server.assert_called_once() - assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": False, "log_pos": 16}) + # If we are in the up for retry state, the log has ended. + expected_end_of_log = state in (TaskInstanceState.UP_FOR_RETRY) + assert actual == ( + "*** this message\nthis\nlog\ncontent", + {"end_of_log": expected_end_of_log, "log_pos": 16}, + ) # Previous try_number should return served logs when remote logs aren't implemented fth._read_from_logs_server = mock.Mock() @@ -342,7 +349,10 @@ class TestFileTaskLogHandler: actual = fth._read(ti=ti, try_number=1) fth._read_remote_logs.assert_called_once() fth._read_from_logs_server.assert_not_called() - assert actual == ("*** remote logs\nremote\nlog\ncontent", {"end_of_log": True, "log_pos": 18}) + assert actual == ( + "*** remote logs\nremote\nlog\ncontent", + {"end_of_log": True, "log_pos": 18}, + ) @pytest.mark.parametrize( "remote_logs, local_logs, served_logs_checked",
