Copilot commented on code in PR #63761:
URL: https://github.com/apache/airflow/pull/63761#discussion_r3066495722


##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -4741,3 +4742,35 @@ def test_dag_add_result(create_runtime_ti, 
mock_supervisor_comms):
             dag_result=True,
         )
     )
+
+
+class TestExecutionTimeoutSending:
+    """Test that _prepare sends SetExecutionTimeout when execution_timeout is 
set."""
+
+    def test_sends_execution_timeout(self, create_runtime_ti, 
mock_supervisor_comms):
+        task = PythonOperator(
+            task_id="timeout_task",
+            python_callable=lambda: None,
+            execution_timeout=timedelta(seconds=120),
+        )
+        ti = create_runtime_ti(task=task)
+
+        run(ti, context=ti.get_template_context(), log=mock.MagicMock())
+
+        assert call(msg=SetExecutionTimeout(execution_timeout=120.0)) in 
mock_supervisor_comms.send.mock_calls
+
+    def test_no_execution_timeout_when_not_set(self, create_runtime_ti, 
mock_supervisor_comms):
+        task = PythonOperator(
+            task_id="no_timeout_task",
+            python_callable=lambda: None,
+        )
+        ti = create_runtime_ti(task=task)
+
+        run(ti, context=ti.get_template_context(), log=mock.MagicMock())
+
+        timeout_calls = [
+            c
+            for c in mock_supervisor_comms.send.mock_calls
+            if hasattr(c.kwargs.get("msg"), "type") and c.kwargs["msg"].type 
== "SetExecutionTimeout"

Review Comment:
   This test filters calls via `msg.type == \"SetExecutionTimeout\"`, which is 
brittle (string coupling) and less precise than type-based checks. Prefer 
filtering with `isinstance(c.kwargs.get(\"msg\"), SetExecutionTimeout)` (or 
equivalent) so refactors to the discriminator string don’t silently break the 
intent of the test.
   ```suggestion
               if isinstance(c.kwargs.get("msg"), SetExecutionTimeout)
   ```



##########
task-sdk/src/airflow/sdk/execution_time/comms.py:
##########
@@ -895,6 +895,13 @@ class SetRenderedMapIndex(BaseModel):
     type: Literal["SetRenderedMapIndex"] = "SetRenderedMapIndex"
 
 
+class SetExecutionTimeout(BaseModel):
+    """Notify the supervisor of the task's execution_timeout so it can enforce 
it as a safety net."""
+
+    execution_timeout: float  # seconds

Review Comment:
   `execution_timeout` is part of an IPC contract; accepting negative/zero 
values without validation can produce surprising supervisor behavior (immediate 
termination, no-op, or inconsistent semantics). Consider validating at the 
model layer (e.g., `Field(gt=0)` / `PositiveFloat`) so invalid values are 
rejected consistently, regardless of sender.
   ```suggestion
       execution_timeout: Annotated[float, Field(gt=0)]  # seconds
   ```



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1125,6 +1126,11 @@ def _prepare(ti: RuntimeTaskInstance, log: Logger, 
context: Context) -> ToSuperv
         # so that we do not call the API unnecessarily
         
SUPERVISOR_COMMS.send(msg=SetRenderedFields(rendered_fields=rendered_fields))
 
+    if ti.task.execution_timeout:
+        timeout_seconds = ti.task.execution_timeout.total_seconds()
+        if timeout_seconds > 0:
+            
SUPERVISOR_COMMS.send(msg=SetExecutionTimeout(execution_timeout=timeout_seconds))

Review Comment:
   This can be simplified to reduce nesting and make the send condition clearer 
(e.g., compute `timeout_seconds` once and use a single conditional). Also 
consider whether the `> 0` rule should be shared/centralized with the primary 
timeout mechanism to avoid future drift.
   ```suggestion
       execution_timeout = ti.task.execution_timeout
       timeout_seconds = execution_timeout.total_seconds() if execution_timeout 
else None
       if timeout_seconds is not None and timeout_seconds > 0:
           
SUPERVISOR_COMMS.send(msg=SetExecutionTimeout(execution_timeout=timeout_seconds))
   ```



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1164,6 +1169,27 @@ def _handle_process_overtime_if_needed(self):
             )
             self.kill(signal.SIGTERM, force=True)
 
+    def _handle_execution_timeout_if_needed(self):
+        """
+        Enforce execution_timeout from the supervisor as a safety net.
+
+        The primary timeout mechanism is SIGALRM in the task runner. This 
method
+        acts as a backup in case the child process loses its signal handler 
(e.g.,
+        after receiving SIGSEGV during DAG processing).
+        """
+        if self._execution_timeout_seconds is None or 
self._execution_start_monotonic is None:
+            return
+        if self._terminal_state:
+            return
+        elapsed = time.monotonic() - self._execution_start_monotonic
+        if elapsed > self._execution_timeout_seconds:
+            self.process_log.error(
+                "Execution timeout reached; supervisor terminating task 
process.",
+                timeout_seconds=self._execution_timeout_seconds,
+                elapsed_seconds=elapsed,
+            )
+            self.kill(signal.SIGTERM, force=True)

Review Comment:
   `_monitor_subprocess()` can continue looping after the subprocess exits 
(when `_exit_code` is set) while sockets are still draining/cleaning up. In 
that case `_handle_execution_timeout_if_needed()` may call `kill()` on an 
already-exited process. Add a guard (e.g., `if self._exit_code is not None: 
return`) or otherwise ensure the process is still alive before attempting 
termination.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to