Copilot commented on code in PR #67229:
URL: https://github.com/apache/airflow/pull/67229#discussion_r3276779730
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py:
##########
@@ -306,6 +313,27 @@ async def _wait_for_container_completion(self) ->
TriggerEvent:
if self.logging_interval is not None:
time_get_more_logs = time_begin +
datetime.timedelta(seconds=self.logging_interval)
while True:
+ # ``execution_deadline`` is the operator's translation of the
+ # task-level ``execution_timeout`` into an absolute UTC timestamp
+ if self.execution_deadline is not None and time.time() >=
self.execution_deadline:
+ self.log.info(
Review Comment:
`execution_deadline` is only checked inside
`_wait_for_container_completion()`. If a pod never leaves Pending / startup
(i.e. `_wait_for_pod_start()` blocks), `execution_timeout` still won’t be
enforced until the startup timeouts fire, which breaks the intended semantics.
Consider checking `execution_deadline` before/within `_wait_for_pod_start()`
(or in `run()` before awaiting it) so tasks can time out during pod start as
well, and add a regression test for the Pending/startup path.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -908,6 +909,21 @@ def invoke_defer_method(
trigger_start_time = datetime.datetime.now(tz=datetime.timezone.utc)
+ # Translate ``execution_timeout`` into an absolute deadline plumbed
into
+ # the trigger.
+ execution_deadline: float | None = None
+ defer_timeout: datetime.timedelta | None = None
+ if self.execution_timeout is not None and context is not None:
+ ti = context.get("ti")
+ ti_start_date = getattr(ti, "start_date", None)
+ if ti_start_date is not None:
+ execution_deadline = ti_start_date.timestamp() +
self.execution_timeout.total_seconds()
+ # Set ``defer.timeout`` so the framework also bounds the
+ # trigger's lifetime (the triggerer enforces this via
+ # ``trigger_timeout``).
+ remaining = execution_deadline - time.time()
+ defer_timeout = datetime.timedelta(seconds=max(0.0, remaining))
Review Comment:
`defer_timeout` is computed as `timedelta(seconds=max(0, remaining))`. When
`remaining <= 0` this becomes `timedelta(0)`, which will set `trigger_timeout`
to “now” and can cause the scheduler to fail the deferred TI with
`TRIGGER_TIMEOUT` before the trigger gets a chance to emit the operator-handled
`timeout` event (bypassing `_clean()`/pod deletion). Consider handling
`remaining <= 0` explicitly (e.g. don’t set `timeout` in that case and let the
trigger emit the timeout event, or fail immediately and run cleanup), and
ensure `timeout` cannot be an already-expired value.
##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py:
##########
@@ -263,6 +289,100 @@ async def test_run_loop_return_failed_event(self,
mock_hook, mock_method, mock_w
assert actual_event == expected_event
+ @pytest.mark.asyncio
+ @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+ @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
+ @mock.patch(f"{TRIGGER_PATH}.hook")
+ async def
test_run_loop_emits_timeout_event_when_execution_deadline_reached(
+ self, mock_hook, mock_define_state, mock_wait_pod
+ ):
+ """
+ When ``execution_deadline`` (the operator's translation of
+ ``execution_timeout``) has already passed, the trigger emits a
+ ``timeout`` event immediately on the first iteration of the
+ completion loop instead of polling indefinitely. The operator's
+ existing terminal-event path then fails the task and runs
+ ``on_finish_action`` (pod delete).
+ """
+ # Already-past deadline → first iteration trips it.
+ past_deadline = 1.0
+ trigger_with_deadline = KubernetesPodTrigger(
+ pod_name=POD_NAME,
+ pod_namespace=NAMESPACE,
+ base_container_name=BASE_CONTAINER_NAME,
+ kubernetes_conn_id=CONN_ID,
+ poll_interval=POLL_INTERVAL,
+ cluster_context=CLUSTER_CONTEXT,
+ config_dict=CONFIG_DICT,
+ in_cluster=IN_CLUSTER,
+ get_logs=GET_LOGS,
+ startup_timeout=STARTUP_TIMEOUT_SECS,
+ startup_check_interval=STARTUP_CHECK_INTERVAL_SECS,
+ schedule_timeout=STARTUP_TIMEOUT_SECS,
+ trigger_start_time=TRIGGER_START_TIME,
+ on_finish_action=ON_FINISH_ACTION,
+ execution_deadline=past_deadline,
+ )
+ # Force the run to enter ``_wait_for_container_completion`` (the
deadline
+ # check lives at the top of that loop).
+ mock_wait_pod.return_value = ContainerState.RUNNING
+ # If the deadline check fails to fire we'd fall through to a pod poll;
+ # make that path return RUNNING so we'd hang rather than emit any other
+ # terminal event — verifies the test fails loudly on regression.
+ mock_define_state.return_value = ContainerState.RUNNING
+ mock_hook.get_pod.return_value =
self._mock_pod_result(mock.AsyncMock())
+
+ actual_event = await trigger_with_deadline.run().asend(None)
+
+ assert actual_event.payload["status"] == "timeout"
+ assert actual_event.payload["namespace"] == NAMESPACE
+ assert actual_event.payload["name"] == POD_NAME
+ assert "execution_timeout" in actual_event.payload["message"]
+ assert actual_event.payload["last_log_time"] is None
+
+ @pytest.mark.asyncio
+ @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+ @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
+ @mock.patch(f"{TRIGGER_PATH}.hook")
+ async def
test_run_loop_does_not_emit_timeout_when_execution_deadline_not_reached(
+ self, mock_hook, mock_define_state, mock_wait_pod
+ ):
+ """
+ When ``execution_deadline`` is still in the future, the trigger keeps
+ polling normally — proves the deadline check doesn't fire preemptively
+ on every run.
+ """
+ # Far-future deadline (~year 2286) — guaranteed not reached.
+ future_deadline = 9_999_999_999.0
+ trigger_with_deadline = KubernetesPodTrigger(
+ pod_name=POD_NAME,
+ pod_namespace=NAMESPACE,
+ base_container_name=BASE_CONTAINER_NAME,
+ kubernetes_conn_id=CONN_ID,
+ poll_interval=POLL_INTERVAL,
+ cluster_context=CLUSTER_CONTEXT,
+ config_dict=CONFIG_DICT,
+ in_cluster=IN_CLUSTER,
+ get_logs=GET_LOGS,
+ startup_timeout=STARTUP_TIMEOUT_SECS,
+ startup_check_interval=STARTUP_CHECK_INTERVAL_SECS,
+ schedule_timeout=STARTUP_TIMEOUT_SECS,
+ trigger_start_time=TRIGGER_START_TIME,
+ on_finish_action=ON_FINISH_ACTION,
+ execution_deadline=future_deadline,
+ )
+ mock_wait_pod.return_value = ContainerState.RUNNING
+ mock_define_state.return_value = ContainerState.RUNNING
+ mock_hook.get_pod.return_value =
self._mock_pod_result(mock.AsyncMock())
+
+ # Trigger must keep waiting (not emit any event yet).
+ task = asyncio.create_task(trigger_with_deadline.run().__anext__())
+ await asyncio.sleep(0.5)
+ assert not task.done()
Review Comment:
This test uses a real `asyncio.sleep(0.5)` to assert the trigger keeps
polling. This adds noticeable wall-clock time to the unit suite and can be
flaky on slow CI. Consider avoiding real sleeps by using `asyncio.wait_for(...,
timeout=small)` to assert the first `__anext__()` does not complete
immediately, or patching `asyncio.sleep`/`poll_interval` to a controllable fast
path.
##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py:
##########
@@ -2627,6 +2628,96 @@ def test_async_create_pod_should_throw_exception(self,
mocked_hook, mocked_clean
log_message = "Trigger emitted an %s event, failing the task: %s"
mocked_log.error.assert_called_once_with(log_message, status, message)
+ @patch(KUB_OP_PATH.format("convert_config_file_to_dict"))
+
@patch("airflow.providers.cncf.kubernetes.operators.pod.BaseHook.get_connection")
+ def
test_invoke_defer_method_passes_execution_deadline_when_execution_timeout_set(
+ self, mocked_get_connection, mocked_convert_config
+ ):
+ """
+ ``execution_timeout`` is translated into an absolute
``execution_deadline``
+ plumbed through to the trigger. Anchoring on ``ti.start_date`` keeps
the
+ deadline stable across re-deferrals, which Airflow does not do for
+ ``execution_timeout`` on deferred tasks.
+ """
+ mocked_get_connection.side_effect =
AirflowNotFoundException("connection not found")
+
+ k = KubernetesPodOperator(
+ task_id=TEST_TASK_ID,
+ namespace=TEST_NAMESPACE,
+ image=TEST_IMAGE,
+ name=TEST_NAME,
+ on_finish_action="keep_pod",
+ in_cluster=True,
+ deferrable=True,
+ execution_timeout=datetime.timedelta(seconds=300),
+ )
+ # Skip the pod-creation path and pretend it's already running.
+ k.pod = MagicMock()
+ k.pod.metadata.name = TEST_NAME
+ k.pod.metadata.namespace = TEST_NAMESPACE
+
+ ti_mock = MagicMock()
+ ti_start = datetime.datetime(2026, 1, 1, 12, 0, 0,
tzinfo=datetime.timezone.utc)
+ ti_mock.start_date = ti_start
+ context = {"ti": ti_mock}
+
+ with patch(
+ f"{TRIGGER_CLASS}.define_pod_container_state",
+ return_value=ContainerState.RUNNING,
+ ):
+ with pytest.raises(TaskDeferred) as exc:
+ k.invoke_defer_method(context=context)
+
+ trigger = exc.value.trigger
+ assert isinstance(trigger, KubernetesPodTrigger)
+ # Deadline = start_date + execution_timeout (300s).
+ expected_deadline = ti_start.timestamp() + 300.0
+ assert trigger.execution_deadline == pytest.approx(expected_deadline,
abs=0.001)
+ # ``defer.timeout`` is set so the framework also bounds the trigger's
+ # lifetime via ``trigger_timeout`` as a backstop. Should be roughly the
+ # remaining budget; here ti_start is in the past, so this depends on
+ # ``time.time()``. Just assert it's set (non-None).
+ assert exc.value.timeout is not None
Review Comment:
This test only asserts `exc.value.timeout is not None`, and the chosen
`ti_start` makes the computed `remaining` depend on the current wall clock (and
can easily produce `timedelta(0)`), so it doesn’t verify that the operator
passes the *remaining* budget correctly. Consider freezing/patching
`time.time()` to a deterministic value and asserting `exc.value.timeout` is
approximately the expected remaining duration (and similarly asserting it stays
`None` when `execution_timeout` is unset).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]