This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new 00f40787cb0 [v3-0-test] fix: Make ``task_success_overtime`` 
configurable (#53342) (#53351)
00f40787cb0 is described below

commit 00f40787cb064c644fc19cfae858daa21eed6491
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jul 18 19:45:32 2025 +0530

    [v3-0-test] fix: Make ``task_success_overtime`` configurable (#53342) 
(#53351)
    
    (cherry picked from commit afb2e8ab119a7eecb1a01d4f419deb7681948d81)
    
    Co-authored-by: Kacper Muda <mudakac...@gmail.com>
---
 task-sdk/src/airflow/sdk/execution_time/supervisor.py   | 17 +++++++++++------
 .../tests/task_sdk/execution_time/test_supervisor.py    |  8 ++++++--
 2 files changed, 17 insertions(+), 8 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 70b384a8c96..efc74506b62 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -142,6 +142,10 @@ MAX_FAILED_HEARTBEATS: int = conf.getint("workers", 
"max_failed_heartbeats")
 
 SOCKET_CLEANUP_TIMEOUT: float = conf.getfloat("workers", 
"socket_cleanup_timeout")
 
+# Maximum possible time (in seconds) that task will have for execution of 
auxiliary processes
+# like listeners after task is complete.
+TASK_OVERTIME_THRESHOLD: float = conf.getfloat("core", "task_success_overtime")
+
 SERVER_TERMINATED = "SERVER_TERMINATED"
 
 # These are the task instance states that require some additional information 
to transition into.
@@ -822,10 +826,6 @@ class ActivitySubprocess(WatchedSubprocess):
     # does not hang around forever.
     failed_heartbeats: int = attrs.field(default=0, init=False)
 
-    # Maximum possible time (in seconds) that task will have for execution of 
auxiliary processes
-    # like listeners after task is complete.
-    # TODO: This should come from airflow.cfg: [core] task_success_overtime
-    TASK_OVERTIME_THRESHOLD: ClassVar[float] = 20.0
     _task_end_time_monotonic: float | None = attrs.field(default=None, 
init=False)
     _rendered_map_index: str | None = attrs.field(default=None, init=False)
 
@@ -975,9 +975,14 @@ class ActivitySubprocess(WatchedSubprocess):
             return
         if (
             self._task_end_time_monotonic
-            and (time.monotonic() - self._task_end_time_monotonic) > 
self.TASK_OVERTIME_THRESHOLD
+            and (time.monotonic() - self._task_end_time_monotonic) > 
TASK_OVERTIME_THRESHOLD
         ):
-            log.warning("Workload success overtime reached; terminating 
process", ti_id=self.id)
+            log.warning(
+                "Task success overtime reached; terminating process. "
+                "Modify `task_success_overtime` setting in [core] section of "
+                "Airflow configuration to change this limit.",
+                ti_id=self.id,
+            )
             self.kill(signal.SIGTERM, force=True)
 
     def _send_heartbeat_if_needed(self):
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 742dba98c4d..69758132670 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -735,7 +735,9 @@ class TestWatchedSubprocess:
         mocker.patch("time.monotonic", return_value=20.0)
 
         # Patch the task overtime threshold
-        monkeypatch.setattr(ActivitySubprocess, "TASK_OVERTIME_THRESHOLD", 
overtime_threshold)
+        monkeypatch.setattr(
+            "airflow.sdk.execution_time.supervisor.TASK_OVERTIME_THRESHOLD", 
overtime_threshold
+        )
 
         mock_watched_subprocess = ActivitySubprocess(
             process_log=mocker.MagicMock(),
@@ -758,7 +760,9 @@ class TestWatchedSubprocess:
         if expected_kill:
             mock_kill.assert_called_once_with(signal.SIGTERM, force=True)
             mock_logger.warning.assert_called_once_with(
-                "Workload success overtime reached; terminating process",
+                "Task success overtime reached; terminating process. "
+                "Modify `task_success_overtime` setting in [core] section of "
+                "Airflow configuration to change this limit.",
                 ti_id=TI_ID,
             )
         else:

Reply via email to