paultmathew opened a new pull request, #67229:
URL: https://github.com/apache/airflow/pull/67229

   ### Why + What
   
   `KubernetesPodOperator(deferrable=True)` does not enforce 
`execution_timeout`. Once the operator defers, the synchronous `execute()` 
returns and the `signal.alarm`-based timeout context wrapping it exits cleanly 
— there is no further `execution_timeout` enforcement for the lifetime of the 
deferral. Pods continue running well past `execution_timeout`, bounded only by 
`active_deadline_seconds` (which defaults to ~1h or whatever the operator 
passed).
   
   The framework gap is acknowledged by `# TODO: handle timeout in case of 
deferral` at 
[`task-sdk/.../task_runner.py:1782`](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L1782).
   
   This PR fixes the symptom for `KubernetesPodOperator`, mirroring the pattern 
already merged for `AirbyteTriggerSyncOperator` ([PR 
#64051](https://github.com/apache/airflow/pull/64051)) and 
`DbtCloudRunJobOperator` ([PR 
#66449](https://github.com/apache/airflow/pull/66449)).
   
   ### Approach
   
   1. **Operator (`pod.py`)**: in `invoke_defer_method`, translate 
`execution_timeout` into an absolute deadline anchored on `ti.start_date`:
      - `execution_deadline = ti.start_date.timestamp() + 
execution_timeout.total_seconds()`
      - Pass `execution_deadline` to `KubernetesPodTrigger`.
      - Pass `timeout=remaining` (timedelta) to `self.defer()` so the 
framework's `trigger_timeout` also bounds the trigger lifetime as a backstop.
      - Anchoring on `ti.start_date` keeps the deadline stable across 
re-deferrals (e.g. `logging_interval` re-entries), since Airflow preserves the 
original `start_date` when a task resumes from defer.
      - Re-pass `context` from `trigger_reentry` → `invoke_defer_method` so the 
deadline is recomputed correctly on each re-defer.
   
   2. **Trigger (`pod.py`)**: at the top of `_wait_for_container_completion`, 
check `time.time() >= execution_deadline` and emit a `status="timeout"` event 
when the deadline is crossed. The operator's existing `trigger_reentry` 
terminal-event path already handles `status in ("error", "failed", "timeout", 
"success")` — the operator fails the task and `_clean()` runs 
`on_finish_action` (default: delete pod).
   
   ### Impact
   
   - **Existing behaviour preserved**: `execution_timeout` was previously a 
no-op for deferred KPO tasks, and remains a no-op when not set. Tasks without 
`execution_timeout` see no behaviour change (`execution_deadline=None`, 
`defer.timeout=None`).
   - **No public API changes**: the new `execution_deadline` parameter on 
`KubernetesPodTrigger` is keyword-only with a `None` default. Trigger 
serialization adds the field but defaults preserve back-compat for existing 
serialized triggers (the trigger's `__init__` accepts the kwarg as optional).
   - **Pod cleanup**: the existing `on_finish_action` path handles pod deletion 
(default `delete_pod`) when the operator fails on a `timeout` event. `_clean()` 
already special-cases `event["status"] == "timeout"` to skip 
`await_pod_completion` (the pod may hang on `ErrImagePull`/`ContainerCreating`).
   
   ### Tests
   
   - **Trigger** (`tests/unit/cncf/kubernetes/triggers/test_pod.py`):
     - Updated `test_serialize` to include the new `execution_deadline` key.
     - Added `test_serialize_with_execution_deadline` — round-trips a non-None 
deadline.
     - Added 
`test_run_loop_emits_timeout_event_when_execution_deadline_reached` — 
past-deadline → first iteration emits `status="timeout"` event.
     - Added 
`test_run_loop_does_not_emit_timeout_when_execution_deadline_not_reached` — 
far-future deadline → trigger keeps polling normally.
   - **Operator** (`tests/unit/cncf/kubernetes/operators/test_pod.py`):
     - Added 
`test_invoke_defer_method_passes_execution_deadline_when_execution_timeout_set` 
— operator with `execution_timeout=300s` passes a deadline ≈ `ti.start_date + 
300s` to the trigger; `defer.timeout` is set.
     - Added 
`test_invoke_defer_method_passes_no_deadline_when_execution_timeout_not_set` — 
operator without `execution_timeout` passes `None` (no enforcement, no 
behaviour change).
   
   ### Backwards Compatibility
   
   No public API changes. New `execution_deadline` parameter on 
`KubernetesPodTrigger` is optional with default `None`. Behaviour change: 
`execution_timeout`-equipped deferred KPO tasks now actually fail at the 
configured timeout instead of running indefinitely; this is the documented 
contract.
   
   ### Closes
   
   Closes: #67227
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to