This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3d3e79d8c84 Enforce execution_timeout in deferrable
KubernetesPodOperator (#67229)
3d3e79d8c84 is described below
commit 3d3e79d8c848b8ba7985ecd6a7b2d9a9990a67fd
Author: Paul Mathew <[email protected]>
AuthorDate: Sat May 30 14:21:08 2026 -0400
Enforce execution_timeout in deferrable KubernetesPodOperator (#67229)
Compute the deadline operator-side from ti.start_date + execution_timeout
and plumb it to KubernetesPodTrigger via
trigger_kwargs["_execution_deadline"]
(an existing dict already accepted and serialized by every subclass) so
the trigger can short-circuit and emit its own status="timeout" event.
This routes timeout through trigger_reentry → _clean() → pod deletion +
final log capture, matching the success/failure event path.
defer.timeout is set to the remaining budget with a 60s minimum so the
trigger has runway to emit its own timeout event before the framework
backstop fires (which would otherwise short-circuit to TaskDeferralTimeout
and skip the operator's cleanup).
Following the leading-underscore convention established by _redefer_count
in the same file: trigger_kwargs is the only existing operator -> trigger
plumbing that's a generic, untyped, dict-shaped, fully-serialized bag and
already accepted by every KubernetesPodTrigger subclass. Using it avoids
adding a new __init__ kwarg or serialize() field on KubernetesPodTrigger,
keeping cross-version compatibility with subclasses (e.g. EksPodTrigger,
GKEStartPodTrigger) released independently.
Closes: #67227
Co-authored-by: Cursor <[email protected]>
---
.../providers/cncf/kubernetes/operators/pod.py | 28 +++-
.../providers/cncf/kubernetes/triggers/pod.py | 74 ++++++++-
.../unit/cncf/kubernetes/operators/test_pod.py | 182 ++++++++++++++++++++-
.../unit/cncf/kubernetes/triggers/test_pod.py | 88 ++++++++++
4 files changed, 368 insertions(+), 4 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 6e935290b53..6324ab2df93 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -28,6 +28,7 @@ import os
import re
import shlex
import string
+import time
from collections.abc import Callable, Container, Iterable, Mapping, Sequence
from contextlib import AbstractContextManager, suppress
from enum import Enum
@@ -908,6 +909,24 @@ class KubernetesPodOperator(BaseOperator):
trigger_start_time = datetime.datetime.now(tz=datetime.timezone.utc)
+ # Translate ``execution_timeout`` into an absolute deadline plumbed to
+ # the trigger via ``trigger_kwargs["_execution_deadline"]``. Anchoring
+ # on ``ti.start_date`` keeps the deadline stable across re-deferrals
+ # (``logging_interval`` re-entries), since Airflow preserves the
+ # original ``start_date`` when a task resumes from defer.
+ trigger_kwargs = dict(self.trigger_kwargs or {})
+ defer_timeout: datetime.timedelta | None = None
+ if self.execution_timeout is not None and context is not None:
+ ti_start_date = context["ti"].start_date
+ execution_deadline = int(ti_start_date.timestamp() +
self.execution_timeout.total_seconds())
+ trigger_kwargs["_execution_deadline"] = execution_deadline
+ # Pad ``defer.timeout`` past the deadline so the framework
+ # backstop doesn't preempt the trigger's ``status="timeout"``
+ # emission and orphan the pod via the ``__fail__`` path.
+ remaining = execution_deadline - time.time()
+ poll_buffer = max(60, int(self.poll_interval * 2))
+ defer_timeout = datetime.timedelta(seconds=max(remaining, 0) +
poll_buffer)
+
trigger = KubernetesPodTrigger(
pod_name=self.pod.metadata.name, # type: ignore[union-attr]
pod_namespace=self.pod.metadata.namespace, # type:
ignore[union-attr]
@@ -928,7 +947,7 @@ class KubernetesPodOperator(BaseOperator):
termination_grace_period=self.termination_grace_period,
last_log_time=last_log_time,
logging_interval=self.logging_interval,
- trigger_kwargs=self.trigger_kwargs,
+ trigger_kwargs=trigger_kwargs,
)
pod_container_state = trigger.define_pod_container_state(self.pod) if
self.pod else None
if context and (
@@ -949,7 +968,7 @@ class KubernetesPodOperator(BaseOperator):
},
)
else:
- self.defer(trigger=trigger, method_name="trigger_reentry")
+ self.defer(trigger=trigger, method_name="trigger_reentry",
timeout=defer_timeout)
def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
"""
@@ -1018,8 +1037,13 @@ class KubernetesPodOperator(BaseOperator):
)
self.trigger_kwargs = dict(self.trigger_kwargs or {})
self.trigger_kwargs["_redefer_count"] = redefer_count + 1
+ # Re-pass ``context`` so ``invoke_defer_method`` can recompute the
+ # ``execution_deadline`` for this re-deferral. ``ti.start_date`` is
+ # preserved across resumes, so the deadline stays anchored to the
+ # original task start.
self.invoke_defer_method(
last_log_time=last_log_time,
+ context=context,
)
# invoke_defer_method raises TaskDeferred, execution does not
continue here
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index 207e97280a5..0599b0519a6 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import asyncio
import contextlib
import datetime
+import time
import traceback
from collections.abc import AsyncIterator
from enum import Enum
@@ -184,8 +185,27 @@ class KubernetesPodTrigger(BaseTrigger):
self.pod_namespace,
self.poll_interval,
)
+ # Fast-path the timeout when ``_execution_deadline`` (set by the
+ # operator from ``execution_timeout``) has already elapsed before the
+ # trigger starts polling.
+ execution_deadline = self.trigger_kwargs.get("_execution_deadline")
+ if execution_deadline is not None and time.time() >=
execution_deadline:
+ self._fired_event = True
+ yield TriggerEvent(
+ {
+ "status": "timeout",
+ "namespace": self.pod_namespace,
+ "name": self.pod_name,
+ "message": (
+ f"Pod {self.pod_namespace}/{self.pod_name} reached the
task's "
+ "execution_timeout deadline before the trigger could
begin polling."
+ ),
+ **self.trigger_kwargs,
+ }
+ )
+ return
try:
- state = await self._wait_for_pod_start()
+ state = await self._wait_for_pod_start_within_deadline()
if state == ContainerState.TERMINATED:
event = TriggerEvent(
{
@@ -272,6 +292,36 @@ class KubernetesPodTrigger(BaseTrigger):
description += f"\ntrigger traceback:\n{curr_traceback}"
return description
+ async def _wait_for_pod_start_within_deadline(self) -> ContainerState:
+ """
+ Run ``_wait_for_pod_start`` bounded by ``_execution_deadline``.
+
+ Wraps the underlying call in :func:`asyncio.wait_for` when an
+ ``_execution_deadline`` is set so the startup phase honours
+ ``execution_timeout`` too — otherwise a Pending pod would not time
+ out until ``startup_timeout`` (default 120s) regardless of how
+ short the user's ``execution_timeout`` was. On timeout we raise
+ :class:`PodLaunchTimeoutException` so the existing handler in
+ :meth:`run` emits the operator's expected ``status="timeout"``
+ event.
+ """
+ execution_deadline = self.trigger_kwargs.get("_execution_deadline")
+ if execution_deadline is None:
+ return await self._wait_for_pod_start()
+ remaining = execution_deadline - time.time()
+ if remaining <= 0:
+ raise PodLaunchTimeoutException(
+ f"Pod {self.pod_namespace}/{self.pod_name} reached the task's "
+ "execution_timeout deadline before the pod left the Pending
phase."
+ )
+ try:
+ return await asyncio.wait_for(self._wait_for_pod_start(),
timeout=remaining)
+ except asyncio.TimeoutError as exc:
+ raise PodLaunchTimeoutException(
+ f"Pod {self.pod_namespace}/{self.pod_name} reached the task's "
+ "execution_timeout deadline while waiting for the pod to
start."
+ ) from exc
+
async def _wait_for_pod_start(self) -> ContainerState:
"""Loops until pod phase leaves ``PENDING`` If timeout is reached,
throws error."""
pod = await self._get_pod()
@@ -305,7 +355,29 @@ class KubernetesPodTrigger(BaseTrigger):
time_get_more_logs = None
if self.logging_interval is not None:
time_get_more_logs = time_begin +
datetime.timedelta(seconds=self.logging_interval)
+ # ``_execution_deadline`` is the operator's translation of the
+ # task-level ``execution_timeout`` into an absolute UTC timestamp
+ execution_deadline = self.trigger_kwargs.get("_execution_deadline")
while True:
+ if execution_deadline is not None and time.time() >=
execution_deadline:
+ self.log.info(
+ "Execution deadline reached for pod %s/%s — emitting
timeout event.",
+ self.pod_namespace,
+ self.pod_name,
+ )
+ return TriggerEvent(
+ {
+ "status": "timeout",
+ "namespace": self.pod_namespace,
+ "name": self.pod_name,
+ "message": (
+ f"Pod {self.pod_namespace}/{self.pod_name} reached
the task's "
+ "execution_timeout deadline."
+ ),
+ "last_log_time": self.last_log_time,
+ **self.trigger_kwargs,
+ }
+ )
pod = await self._get_pod()
pod_container_state = self.define_pod_container_state(pod)
if pod_container_state == ContainerState.TERMINATED:
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index 8f130788107..f2a338c0359 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -39,7 +39,7 @@ from airflow.providers.cncf.kubernetes.operators.pod import (
_optionally_suppress,
)
from airflow.providers.cncf.kubernetes.secret import Secret
-from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
+from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState,
KubernetesPodTrigger
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
OnFinishAction,
PodLoggingStatus,
@@ -50,6 +50,7 @@ from airflow.providers.cncf.kubernetes.utils.xcom_sidecar
import PodDefaults
from airflow.providers.common.compat.sdk import (
XCOM_RETURN_KEY,
AirflowException,
+ AirflowNotFoundException,
AirflowSkipException,
TaskDeferred,
)
@@ -2655,6 +2656,185 @@ class TestKubernetesPodOperatorAsync:
log_message = "Trigger emitted an %s event, failing the task: %s"
mocked_log.error.assert_called_once_with(log_message, status, message)
+ @patch(KUB_OP_PATH.format("convert_config_file_to_dict"))
+
@patch("airflow.providers.cncf.kubernetes.operators.pod.BaseHook.get_connection")
+ def
test_invoke_defer_method_passes_execution_deadline_when_execution_timeout_set(
+ self, mocked_get_connection, mocked_convert_config
+ ):
+ """
+ ``execution_timeout`` is converted into an absolute
``_execution_deadline``
+ anchored on ``ti.start_date`` and propagated to the trigger via
+ ``trigger_kwargs``. ``defer.timeout`` covers the remaining budget plus
+ a poll-interval buffer so the framework backstop doesn't race the
+ trigger.
+ """
+ mocked_get_connection.side_effect =
AirflowNotFoundException("connection not found")
+
+ execution_timeout = datetime.timedelta(seconds=300)
+ k = KubernetesPodOperator(
+ task_id=TEST_TASK_ID,
+ namespace=TEST_NAMESPACE,
+ image=TEST_IMAGE,
+ name=TEST_NAME,
+ on_finish_action="keep_pod",
+ in_cluster=True,
+ deferrable=True,
+ execution_timeout=execution_timeout,
+ )
+ # Skip the pod-creation path and pretend it's already running.
+ k.pod = MagicMock()
+ k.pod.metadata.name = TEST_NAME
+ k.pod.metadata.namespace = TEST_NAMESPACE
+
+ ti_mock = MagicMock()
+ ti_start = datetime.datetime(2026, 1, 1, 12, 0, 0,
tzinfo=datetime.timezone.utc)
+ ti_mock.start_date = ti_start
+ context = {"ti": ti_mock}
+
+ # Freeze at ``ti_start + 30s`` → remaining=270s. ``defer.timeout`` =
+ # remaining + max(60, poll_interval * 2) = 270 + 60 = 330s for default
+ # poll_interval=2.
+ elapsed = datetime.timedelta(seconds=30)
+ with (
+ time_machine.travel(ti_start + elapsed, tick=False),
+ patch(f"{TRIGGER_CLASS}.define_pod_container_state",
return_value=ContainerState.RUNNING),
+ pytest.raises(TaskDeferred) as exc,
+ ):
+ k.invoke_defer_method(context=context)
+
+ trigger = exc.value.trigger
+ assert isinstance(trigger, KubernetesPodTrigger)
+ expected_deadline = int((ti_start + execution_timeout).timestamp())
+ assert trigger.trigger_kwargs["_execution_deadline"] ==
expected_deadline
+ assert exc.value.timeout == datetime.timedelta(seconds=270 + 60)
+
+ @patch(KUB_OP_PATH.format("convert_config_file_to_dict"))
+
@patch("airflow.providers.cncf.kubernetes.operators.pod.BaseHook.get_connection")
+ def test_invoke_defer_method_pads_defer_timeout_for_slow_poll_interval(
+ self, mocked_get_connection, mocked_convert_config
+ ):
+ """
+ With a slow ``poll_interval`` (e.g. 60s), ``defer.timeout`` is padded
+ by ``poll_interval * 2`` so the framework backstop doesn't preempt the
+ trigger before it can emit its own ``status="timeout"`` event from the
+ next poll cycle.
+ """
+ mocked_get_connection.side_effect =
AirflowNotFoundException("connection not found")
+
+ execution_timeout = datetime.timedelta(seconds=30)
+ k = KubernetesPodOperator(
+ task_id=TEST_TASK_ID,
+ namespace=TEST_NAMESPACE,
+ image=TEST_IMAGE,
+ name=TEST_NAME,
+ on_finish_action="keep_pod",
+ in_cluster=True,
+ deferrable=True,
+ execution_timeout=execution_timeout,
+ poll_interval=60,
+ )
+ k.pod = MagicMock()
+ k.pod.metadata.name = TEST_NAME
+ k.pod.metadata.namespace = TEST_NAMESPACE
+
+ ti_mock = MagicMock()
+ ti_start = datetime.datetime(2026, 1, 1, 12, 0, 0,
tzinfo=datetime.timezone.utc)
+ ti_mock.start_date = ti_start
+ context = {"ti": ti_mock}
+
+ # Freeze at ``ti_start`` → remaining=30s. ``defer.timeout`` =
+ # remaining + max(60, poll_interval * 2) = 30 + 120 = 150s.
+ with (
+ time_machine.travel(ti_start, tick=False),
+ patch(f"{TRIGGER_CLASS}.define_pod_container_state",
return_value=ContainerState.RUNNING),
+ pytest.raises(TaskDeferred) as exc,
+ ):
+ k.invoke_defer_method(context=context)
+
+ assert exc.value.timeout == datetime.timedelta(seconds=30 + 120)
+
+ @patch(KUB_OP_PATH.format("convert_config_file_to_dict"))
+
@patch("airflow.providers.cncf.kubernetes.operators.pod.BaseHook.get_connection")
+ def
test_invoke_defer_method_floors_defer_timeout_when_deadline_already_past(
+ self, mocked_get_connection, mocked_convert_config
+ ):
+ """
+ When the deadline is already past (re-defer after deadline elapsed),
+ the negative remaining is floored to zero and ``defer.timeout`` falls
+ back to the pure poll-interval buffer.
+ """
+ mocked_get_connection.side_effect =
AirflowNotFoundException("connection not found")
+
+ execution_timeout = datetime.timedelta(seconds=300)
+ k = KubernetesPodOperator(
+ task_id=TEST_TASK_ID,
+ namespace=TEST_NAMESPACE,
+ image=TEST_IMAGE,
+ name=TEST_NAME,
+ on_finish_action="keep_pod",
+ in_cluster=True,
+ deferrable=True,
+ execution_timeout=execution_timeout,
+ )
+ k.pod = MagicMock()
+ k.pod.metadata.name = TEST_NAME
+ k.pod.metadata.namespace = TEST_NAMESPACE
+
+ ti_mock = MagicMock()
+ ti_start = datetime.datetime(2026, 1, 1, 12, 0, 0,
tzinfo=datetime.timezone.utc)
+ ti_mock.start_date = ti_start
+ context = {"ti": ti_mock}
+
+ # Freeze well past the deadline (ti_start + 600s, deadline at +300s)
+ # so remaining is negative; floor + 60s default buffer = 60s.
+ with (
+ time_machine.travel(ti_start + datetime.timedelta(seconds=600),
tick=False),
+ patch(f"{TRIGGER_CLASS}.define_pod_container_state",
return_value=ContainerState.RUNNING),
+ pytest.raises(TaskDeferred) as exc,
+ ):
+ k.invoke_defer_method(context=context)
+
+ assert exc.value.timeout == datetime.timedelta(seconds=60)
+
+ @patch(KUB_OP_PATH.format("convert_config_file_to_dict"))
+
@patch("airflow.providers.cncf.kubernetes.operators.pod.BaseHook.get_connection")
+ def
test_invoke_defer_method_passes_no_deadline_when_execution_timeout_not_set(
+ self, mocked_get_connection, mocked_convert_config
+ ):
+ """
+ Without ``execution_timeout``, ``_execution_deadline`` is absent from
+ ``trigger_kwargs`` and ``defer.timeout`` is ``None`` — preserving
+ pre-fix behaviour for tasks that don't opt in.
+ """
+ mocked_get_connection.side_effect =
AirflowNotFoundException("connection not found")
+
+ k = KubernetesPodOperator(
+ task_id=TEST_TASK_ID,
+ namespace=TEST_NAMESPACE,
+ image=TEST_IMAGE,
+ name=TEST_NAME,
+ on_finish_action="keep_pod",
+ in_cluster=True,
+ deferrable=True,
+ )
+ k.pod = MagicMock()
+ k.pod.metadata.name = TEST_NAME
+ k.pod.metadata.namespace = TEST_NAMESPACE
+
+ ti_mock = MagicMock()
+ ti_mock.start_date = datetime.datetime(2026, 1, 1, 12, 0, 0,
tzinfo=datetime.timezone.utc)
+ context = {"ti": ti_mock}
+
+ with (
+ patch(f"{TRIGGER_CLASS}.define_pod_container_state",
return_value=ContainerState.RUNNING),
+ pytest.raises(TaskDeferred) as exc,
+ ):
+ k.invoke_defer_method(context=context)
+
+ trigger = exc.value.trigger
+ assert "_execution_deadline" not in trigger.trigger_kwargs
+ assert exc.value.timeout is None
+
@pytest.mark.parametrize(
("kwargs", "actual_exit_code", "expected_exc", "pod_status",
"event_status"),
[
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
index 765a3f35e3d..9889726ab58 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
@@ -21,6 +21,7 @@ import asyncio
import contextlib
import datetime
import logging
+import time
from asyncio import Future
from unittest import mock
from unittest.mock import MagicMock
@@ -263,6 +264,93 @@ class TestKubernetesPodTrigger:
assert actual_event == expected_event
+ @pytest.mark.asyncio
+ @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+ @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
+ @mock.patch(f"{TRIGGER_PATH}.hook")
+ async def
test_run_loop_emits_timeout_event_when_execution_deadline_reached(
+ self, mock_hook, mock_define_state, mock_wait_pod
+ ):
+ """
+ When the execution deadline has already elapsed before the trigger
+ starts polling, the trigger immediately emits a timeout event instead
+ of entering the polling loop.
+ """
+ # Already-past deadline → ``run()`` short-circuit trips it.
+ past_deadline = 1
+ trigger_with_deadline = KubernetesPodTrigger(
+ pod_name=POD_NAME,
+ pod_namespace=NAMESPACE,
+ base_container_name=BASE_CONTAINER_NAME,
+ kubernetes_conn_id=CONN_ID,
+ poll_interval=POLL_INTERVAL,
+ cluster_context=CLUSTER_CONTEXT,
+ config_dict=CONFIG_DICT,
+ in_cluster=IN_CLUSTER,
+ get_logs=GET_LOGS,
+ startup_timeout=STARTUP_TIMEOUT_SECS,
+ startup_check_interval=STARTUP_CHECK_INTERVAL_SECS,
+ schedule_timeout=STARTUP_TIMEOUT_SECS,
+ trigger_start_time=TRIGGER_START_TIME,
+ on_finish_action=ON_FINISH_ACTION,
+ trigger_kwargs={"_execution_deadline": past_deadline},
+ )
+ # If the short-circuit fails, ``_wait_for_pod_start`` would be called
+ # next; making it RUNNING ensures the test fails loudly rather than
+ # accidentally emitting some other terminal event.
+ mock_wait_pod.return_value = ContainerState.RUNNING
+ mock_define_state.return_value = ContainerState.RUNNING
+ mock_hook.get_pod.return_value =
self._mock_pod_result(mock.AsyncMock())
+
+ actual_event = await trigger_with_deadline.run().asend(None)
+
+ # ``last_log_time`` is intentionally absent from this short-circuit and
+ # from the ``PodLaunchTimeoutException`` handler — both fire before any
+ # log fetching has happened. The mid-poll deadline check (in
+ # ``_wait_for_container_completion``) is the only timeout site that
+ # carries ``last_log_time`` because that's where it can be meaningfully
+ # populated.
+ assert actual_event.payload["status"] == "timeout"
+ assert actual_event.payload["namespace"] == NAMESPACE
+ assert actual_event.payload["name"] == POD_NAME
+ assert "execution_timeout" in actual_event.payload["message"]
+
+ @pytest.mark.asyncio
+ async def
test_run_emits_timeout_when_deadline_passed_during_pod_startup(self):
+ """
+ The trigger emits a timeout event if the execution deadline expires
+ during pod startup.
+ """
+ trigger_with_deadline = KubernetesPodTrigger(
+ pod_name=POD_NAME,
+ pod_namespace=NAMESPACE,
+ base_container_name=BASE_CONTAINER_NAME,
+ kubernetes_conn_id=CONN_ID,
+ poll_interval=POLL_INTERVAL,
+ cluster_context=CLUSTER_CONTEXT,
+ config_dict=CONFIG_DICT,
+ in_cluster=IN_CLUSTER,
+ get_logs=GET_LOGS,
+ startup_timeout=STARTUP_TIMEOUT_SECS,
+ startup_check_interval=STARTUP_CHECK_INTERVAL_SECS,
+ schedule_timeout=STARTUP_TIMEOUT_SECS,
+ trigger_start_time=TRIGGER_START_TIME,
+ on_finish_action=ON_FINISH_ACTION,
+ # Deadline crosses 1 second from now; ``_wait_for_pod_start`` is
mocked
+ # to never return so only the deadline can break the wait.
+ trigger_kwargs={"_execution_deadline": int(time.time()) + 1},
+ )
+
+ async def _hang_forever():
+ await asyncio.sleep(60)
+ return ContainerState.RUNNING
+
+ with mock.patch.object(trigger_with_deadline, "_wait_for_pod_start",
_hang_forever):
+ actual_event = await trigger_with_deadline.run().asend(None)
+
+ assert actual_event.payload["status"] == "timeout"
+ assert "execution_timeout" in actual_event.payload["message"]
+
@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
@mock.patch(f"{TRIGGER_PATH}.hook")