This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push: new 00f40787cb0 [v3-0-test] fix: Make ``task_success_overtime`` configurable (#53342) (#53351) 00f40787cb0 is described below commit 00f40787cb064c644fc19cfae858daa21eed6491 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Jul 18 19:45:32 2025 +0530 [v3-0-test] fix: Make ``task_success_overtime`` configurable (#53342) (#53351) (cherry picked from commit afb2e8ab119a7eecb1a01d4f419deb7681948d81) Co-authored-by: Kacper Muda <mudakac...@gmail.com> --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 17 +++++++++++------ .../tests/task_sdk/execution_time/test_supervisor.py | 8 ++++++-- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 70b384a8c96..efc74506b62 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -142,6 +142,10 @@ MAX_FAILED_HEARTBEATS: int = conf.getint("workers", "max_failed_heartbeats") SOCKET_CLEANUP_TIMEOUT: float = conf.getfloat("workers", "socket_cleanup_timeout") +# Maximum possible time (in seconds) that task will have for execution of auxiliary processes +# like listeners after task is complete. +TASK_OVERTIME_THRESHOLD: float = conf.getfloat("core", "task_success_overtime") + SERVER_TERMINATED = "SERVER_TERMINATED" # These are the task instance states that require some additional information to transition into. @@ -822,10 +826,6 @@ class ActivitySubprocess(WatchedSubprocess): # does not hang around forever. failed_heartbeats: int = attrs.field(default=0, init=False) - # Maximum possible time (in seconds) that task will have for execution of auxiliary processes - # like listeners after task is complete. - # TODO: This should come from airflow.cfg: [core] task_success_overtime - TASK_OVERTIME_THRESHOLD: ClassVar[float] = 20.0 _task_end_time_monotonic: float | None = attrs.field(default=None, init=False) _rendered_map_index: str | None = attrs.field(default=None, init=False) @@ -975,9 +975,14 @@ class ActivitySubprocess(WatchedSubprocess): return if ( self._task_end_time_monotonic - and (time.monotonic() - self._task_end_time_monotonic) > self.TASK_OVERTIME_THRESHOLD + and (time.monotonic() - self._task_end_time_monotonic) > TASK_OVERTIME_THRESHOLD ): - log.warning("Workload success overtime reached; terminating process", ti_id=self.id) + log.warning( + "Task success overtime reached; terminating process. " + "Modify `task_success_overtime` setting in [core] section of " + "Airflow configuration to change this limit.", + ti_id=self.id, + ) self.kill(signal.SIGTERM, force=True) def _send_heartbeat_if_needed(self): diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 742dba98c4d..69758132670 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -735,7 +735,9 @@ class TestWatchedSubprocess: mocker.patch("time.monotonic", return_value=20.0) # Patch the task overtime threshold - monkeypatch.setattr(ActivitySubprocess, "TASK_OVERTIME_THRESHOLD", overtime_threshold) + monkeypatch.setattr( + "airflow.sdk.execution_time.supervisor.TASK_OVERTIME_THRESHOLD", overtime_threshold + ) mock_watched_subprocess = ActivitySubprocess( process_log=mocker.MagicMock(), @@ -758,7 +760,9 @@ class TestWatchedSubprocess: if expected_kill: mock_kill.assert_called_once_with(signal.SIGTERM, force=True) mock_logger.warning.assert_called_once_with( - "Workload success overtime reached; terminating process", + "Task success overtime reached; terminating process. " + "Modify `task_success_overtime` setting in [core] section of " + "Airflow configuration to change this limit.", ti_id=TI_ID, ) else: