Copilot commented on code in PR #63761:
URL: https://github.com/apache/airflow/pull/63761#discussion_r3066495722
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -4741,3 +4742,35 @@ def test_dag_add_result(create_runtime_ti,
mock_supervisor_comms):
dag_result=True,
)
)
+
+
+class TestExecutionTimeoutSending:
+ """Test that _prepare sends SetExecutionTimeout when execution_timeout is
set."""
+
+ def test_sends_execution_timeout(self, create_runtime_ti,
mock_supervisor_comms):
+ task = PythonOperator(
+ task_id="timeout_task",
+ python_callable=lambda: None,
+ execution_timeout=timedelta(seconds=120),
+ )
+ ti = create_runtime_ti(task=task)
+
+ run(ti, context=ti.get_template_context(), log=mock.MagicMock())
+
+ assert call(msg=SetExecutionTimeout(execution_timeout=120.0)) in
mock_supervisor_comms.send.mock_calls
+
+ def test_no_execution_timeout_when_not_set(self, create_runtime_ti,
mock_supervisor_comms):
+ task = PythonOperator(
+ task_id="no_timeout_task",
+ python_callable=lambda: None,
+ )
+ ti = create_runtime_ti(task=task)
+
+ run(ti, context=ti.get_template_context(), log=mock.MagicMock())
+
+ timeout_calls = [
+ c
+ for c in mock_supervisor_comms.send.mock_calls
+ if hasattr(c.kwargs.get("msg"), "type") and c.kwargs["msg"].type
== "SetExecutionTimeout"
Review Comment:
This test filters calls via `msg.type == \"SetExecutionTimeout\"`, which is
brittle (string coupling) and less precise than type-based checks. Prefer
filtering with `isinstance(c.kwargs.get(\"msg\"), SetExecutionTimeout)` (or
equivalent) so refactors to the discriminator string don’t silently break the
intent of the test.
```suggestion
if isinstance(c.kwargs.get("msg"), SetExecutionTimeout)
```
##########
task-sdk/src/airflow/sdk/execution_time/comms.py:
##########
@@ -895,6 +895,13 @@ class SetRenderedMapIndex(BaseModel):
type: Literal["SetRenderedMapIndex"] = "SetRenderedMapIndex"
+class SetExecutionTimeout(BaseModel):
+ """Notify the supervisor of the task's execution_timeout so it can enforce
it as a safety net."""
+
+ execution_timeout: float # seconds
Review Comment:
`execution_timeout` is part of an IPC contract; accepting negative/zero
values without validation can produce surprising supervisor behavior (immediate
termination, no-op, or inconsistent semantics). Consider validating at the
model layer (e.g., `Field(gt=0)` / `PositiveFloat`) so invalid values are
rejected consistently, regardless of sender.
```suggestion
execution_timeout: Annotated[float, Field(gt=0)] # seconds
```
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1125,6 +1126,11 @@ def _prepare(ti: RuntimeTaskInstance, log: Logger,
context: Context) -> ToSuperv
# so that we do not call the API unnecessarily
SUPERVISOR_COMMS.send(msg=SetRenderedFields(rendered_fields=rendered_fields))
+ if ti.task.execution_timeout:
+ timeout_seconds = ti.task.execution_timeout.total_seconds()
+ if timeout_seconds > 0:
+
SUPERVISOR_COMMS.send(msg=SetExecutionTimeout(execution_timeout=timeout_seconds))
Review Comment:
This can be simplified to reduce nesting and make the send condition clearer
(e.g., compute `timeout_seconds` once and use a single conditional). Also
consider whether the `> 0` rule should be shared/centralized with the primary
timeout mechanism to avoid future drift.
```suggestion
execution_timeout = ti.task.execution_timeout
timeout_seconds = execution_timeout.total_seconds() if execution_timeout
else None
if timeout_seconds is not None and timeout_seconds > 0:
SUPERVISOR_COMMS.send(msg=SetExecutionTimeout(execution_timeout=timeout_seconds))
```
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1164,6 +1169,27 @@ def _handle_process_overtime_if_needed(self):
)
self.kill(signal.SIGTERM, force=True)
+ def _handle_execution_timeout_if_needed(self):
+ """
+ Enforce execution_timeout from the supervisor as a safety net.
+
+ The primary timeout mechanism is SIGALRM in the task runner. This
method
+ acts as a backup in case the child process loses its signal handler
(e.g.,
+ after receiving SIGSEGV during DAG processing).
+ """
+ if self._execution_timeout_seconds is None or
self._execution_start_monotonic is None:
+ return
+ if self._terminal_state:
+ return
+ elapsed = time.monotonic() - self._execution_start_monotonic
+ if elapsed > self._execution_timeout_seconds:
+ self.process_log.error(
+ "Execution timeout reached; supervisor terminating task
process.",
+ timeout_seconds=self._execution_timeout_seconds,
+ elapsed_seconds=elapsed,
+ )
+ self.kill(signal.SIGTERM, force=True)
Review Comment:
`_monitor_subprocess()` can continue looping after the subprocess exits
(when `_exit_code` is set) while sockets are still draining/cleaning up. In
that case `_handle_execution_timeout_if_needed()` may call `kill()` on an
already-exited process. Add a guard (e.g., `if self._exit_code is not None:
return`) or otherwise ensure the process is still alive before attempting
termination.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]