ashb commented on code in PR #57778:
URL: https://github.com/apache/airflow/pull/57778#discussion_r2493887196


##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1382,6 +1431,9 @@ def _handle_request(self, msg: ToSupervisor, log: 
FilteringBoundLogger, req_id:
             inactive_assets_resp = 
self.client.task_instances.validate_inlets_and_outlets(msg.ti_id)
             resp = 
InactiveAssetsResult.from_inactive_assets_response(inactive_assets_resp)
             dump_opts = {"exclude_unset": True}
+        elif isinstance(msg, SetTaskExecutionTimeout):
+            self._execution_timeout_seconds = msg.execution_timeout_seconds
+            self._task_execution_start_monotonic = time.monotonic()

Review Comment:
   Do we need a separate start time here, or should the timer start counting 
down as soon as the process itself starts (in which case we don't need a 
separate variable here)



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1097,6 +1107,45 @@ def _handle_process_overtime_if_needed(self):
             )
             self.kill(signal.SIGTERM, force=True)
 
+    def _check_task_timeout(self):
+        """
+        Check if task has exceeded execution_timeout and kill it if necessary.
+
+        This handles task timeout at the supervisor level rather than in the 
task
+        process itself.
+
+        The method implements signal escalation: SIGTERM -> SIGKILL if process 
doesn't exit.
+        """
+        # Only check timeout if we have a timeout set and execution has started
+        if self._execution_timeout_seconds is None or 
self._task_execution_start_monotonic is None:

Review Comment:
   We shouldn't get here with `_task_execution_start_monotonic as None` should 
we?



##########
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:
##########
@@ -224,6 +225,46 @@ def test_supervise(
             with expectation:
                 supervise(**kw)
 
+    def test_supervisor_enforces_execution_timeout(self, test_dags_dir, 
captured_logs, client_with_ti_start):
+        """
+        Test that the supervisor enforces execution_timeout and kills the task.
+        """
+        ti = TaskInstance(
+            id=uuid7(),
+            task_id="task_with_timeout",
+            dag_id="timeout_test",
+            run_id="test_run",
+            try_number=1,
+            dag_version_id=uuid7(),
+        )
+
+        bundle_info = BundleInfo(name="my-bundle", version=None)
+
+        start_time = time.time()
+
+        with patch.dict(os.environ, local_dag_bundle_cfg(test_dags_dir, 
bundle_info.name)):
+            exit_code = supervise(
+                ti=ti,
+                dag_rel_path="timeout_test.py",

Review Comment:
   This file wasn't added -- was it meant to be?



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1097,6 +1107,45 @@ def _handle_process_overtime_if_needed(self):
             )
             self.kill(signal.SIGTERM, force=True)
 
+    def _check_task_timeout(self):
+        """
+        Check if task has exceeded execution_timeout and kill it if necessary.
+
+        This handles task timeout at the supervisor level rather than in the 
task
+        process itself.
+
+        The method implements signal escalation: SIGTERM -> SIGKILL if process 
doesn't exit.
+        """
+        # Only check timeout if we have a timeout set and execution has started
+        if self._execution_timeout_seconds is None or 
self._task_execution_start_monotonic is None:
+            return
+
+        # Don't check timeout if task has already reached a terminal state
+        if self._terminal_state:
+            return
+
+        elapsed_time = time.monotonic() - self._task_execution_start_monotonic
+
+        if elapsed_time > self._execution_timeout_seconds:
+            log.error(
+                "Task execution timeout exceeded; terminating process",
+                timeout_seconds=self._execution_timeout_seconds,
+                elapsed_seconds=elapsed_time,
+                ti_id=self.id,
+                pid=self.pid,
+            )
+            self.process_log.error(
+                "Task execution timeout exceeded. Terminating process.",
+                timeout_seconds=self._execution_timeout_seconds,
+                elapsed_seconds=elapsed_time,
+            )
+
+            # Kill the process with signal escalation (SIGTERM -> SIGKILL)
+            self.kill(signal.SIGTERM, escalation_delay=2.0, force=True)

Review Comment:
   2s here seems very short, especially to be hard coded.  Why did you feel it 
needed to be shorter than the default 5s the fn already has?



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1325,23 +1332,8 @@ def _execute_task(context: Context, ti: 
RuntimeTaskInstance, log: Logger):
 
     _run_task_state_change_callbacks(task, "on_execute_callback", context, log)
 
-    if task.execution_timeout:
-        from airflow.sdk.execution_time.timeout import timeout
-
-        # TODO: handle timeout in case of deferral
-        timeout_seconds = task.execution_timeout.total_seconds()
-        try:
-            # It's possible we're already timed out, so fast-fail if true
-            if timeout_seconds <= 0:
-                raise AirflowTaskTimeout()
-            # Run task in timeout wrapper
-            with timeout(timeout_seconds):
-                result = ctx.run(execute, context=context)
-        except AirflowTaskTimeout:
-            task.on_kill()

Review Comment:
   With this now removed, does we still call `task.on_kill()` when the timeout 
expires?



##########
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:
##########
@@ -224,6 +225,46 @@ def test_supervise(
             with expectation:
                 supervise(**kw)
 
+    def test_supervisor_enforces_execution_timeout(self, test_dags_dir, 
captured_logs, client_with_ti_start):

Review Comment:
   Could you please update this test to ensure the `on_kill` is still called?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to