SameerMesiah97 commented on code in PR #67229:
URL: https://github.com/apache/airflow/pull/67229#discussion_r3295172172


##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py:
##########
@@ -2655,6 +2656,148 @@ def test_async_create_pod_should_throw_exception(self, 
mocked_hook, mocked_clean
         log_message = "Trigger emitted an %s event, failing the task: %s"
         mocked_log.error.assert_called_once_with(log_message, status, message)
 
+    @patch(KUB_OP_PATH.format("convert_config_file_to_dict"))
+    
@patch("airflow.providers.cncf.kubernetes.operators.pod.BaseHook.get_connection")
+    def 
test_invoke_defer_method_passes_execution_deadline_when_execution_timeout_set(
+        self, mocked_get_connection, mocked_convert_config
+    ):
+        """
+        ``execution_timeout`` is translated into an absolute 
``execution_deadline``
+        plumbed through to the trigger, and ``defer.timeout`` is set to the
+        remaining budget so the framework's ``trigger_timeout`` bounds the
+        trigger's lifetime as a backstop. Anchoring on ``ti.start_date`` keeps
+        the deadline stable across re-deferrals.

Review Comment:
   The current docstring goes into too much detail regarding the 
implementation. I would do something like the below instead:
   
   ```
   """
   ``execution_timeout`` is converted into an absolute
   ``execution_deadline`` anchored on ``ti.start_date`` and propagated
   to the trigger. The defer timeout uses the remaining execution budget.
   """
   ```



##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/eks.py:
##########
@@ -118,49 +115,11 @@ def __init__(
         eks_cluster_name: str,
         aws_conn_id: str | None = None,
         region: str | None = None,
-        pod_name: str,
-        pod_namespace: str,
-        trigger_start_time: datetime.datetime,
-        base_container_name: str,
-        kubernetes_conn_id: str | None = None,
-        connection_extras: dict | None = None,
-        poll_interval: float = 2,
-        cluster_context: str | None = None,
-        config_dict: dict | None = None,
-        in_cluster: bool | None = None,
-        get_logs: bool = True,
-        startup_timeout: int = 120,
-        startup_check_interval: float = 5,
-        schedule_timeout: int = 120,
-        on_finish_action: str = "delete_pod",
-        on_kill_action: str = "delete_pod",
-        termination_grace_period: int | None = None,
-        last_log_time: DateTime | None = None,
-        logging_interval: int | None = None,
-        trigger_kwargs: dict | None = None,
+        **kwargs,
     ):
-        super().__init__(
-            pod_name=pod_name,
-            pod_namespace=pod_namespace,
-            trigger_start_time=trigger_start_time,
-            base_container_name=base_container_name,
-            kubernetes_conn_id=kubernetes_conn_id,
-            connection_extras=connection_extras,
-            poll_interval=poll_interval,
-            cluster_context=cluster_context,
-            config_dict=config_dict,
-            in_cluster=in_cluster,
-            get_logs=get_logs,
-            startup_timeout=startup_timeout,
-            startup_check_interval=startup_check_interval,
-            schedule_timeout=schedule_timeout,
-            on_finish_action=on_finish_action,
-            on_kill_action=on_kill_action,
-            termination_grace_period=termination_grace_period,
-            last_log_time=last_log_time,
-            logging_interval=logging_interval,
-            trigger_kwargs=trigger_kwargs,
-        )

Review Comment:
   I understand you removed this to handle version discrepancies between the 
AWS and Kubernetes providers, but I think it is better to just leave the older 
parameter declaration as is and add the new `execution_deadline` parameter 
(basically what you had initially). Of course, this means that users will have 
need the version of the kubernetes provider with this new `execution_deadline` 
parameter but this looks like the best option at the moment. 



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py:
##########
@@ -263,6 +290,139 @@ async def test_run_loop_return_failed_event(self, 
mock_hook, mock_method, mock_w
 
         assert actual_event == expected_event
 
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.hook")
+    async def 
test_run_loop_emits_timeout_event_when_execution_deadline_reached(
+        self, mock_hook, mock_define_state, mock_wait_pod
+    ):
+        """
+        When ``execution_deadline`` (the operator's translation of
+        ``execution_timeout``) has already passed before the trigger picks
+        the task up, the trigger short-circuits at the top of ``run()`` and
+        emits a ``timeout`` event immediately instead of starting the wait
+        chain. The operator's existing terminal-event path then fails the
+        task and runs ``on_finish_action`` (pod delete).
+        """
+        # Already-past deadline → ``run()`` short-circuit trips it.
+        past_deadline = 1
+        trigger_with_deadline = KubernetesPodTrigger(
+            pod_name=POD_NAME,
+            pod_namespace=NAMESPACE,
+            base_container_name=BASE_CONTAINER_NAME,
+            kubernetes_conn_id=CONN_ID,
+            poll_interval=POLL_INTERVAL,
+            cluster_context=CLUSTER_CONTEXT,
+            config_dict=CONFIG_DICT,
+            in_cluster=IN_CLUSTER,
+            get_logs=GET_LOGS,
+            startup_timeout=STARTUP_TIMEOUT_SECS,
+            startup_check_interval=STARTUP_CHECK_INTERVAL_SECS,
+            schedule_timeout=STARTUP_TIMEOUT_SECS,
+            trigger_start_time=TRIGGER_START_TIME,
+            on_finish_action=ON_FINISH_ACTION,
+            execution_deadline=past_deadline,
+        )
+        # If the short-circuit fails, ``_wait_for_pod_start`` would be called
+        # next; making it RUNNING ensures the test fails loudly rather than
+        # accidentally emitting some other terminal event.
+        mock_wait_pod.return_value = ContainerState.RUNNING
+        mock_define_state.return_value = ContainerState.RUNNING
+        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.AsyncMock())
+
+        actual_event = await trigger_with_deadline.run().asend(None)
+
+        # ``last_log_time`` is intentionally absent from this short-circuit and
+        # from the ``PodLaunchTimeoutException`` handler — both fire before any
+        # log fetching has happened. The mid-poll deadline check (in
+        # ``_wait_for_container_completion``) is the only timeout site that
+        # carries ``last_log_time`` because that's where it can be meaningfully
+        # populated.
+        assert actual_event.payload["status"] == "timeout"
+        assert actual_event.payload["namespace"] == NAMESPACE
+        assert actual_event.payload["name"] == POD_NAME
+        assert "execution_timeout" in actual_event.payload["message"]
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.hook")
+    async def 
test_run_loop_does_not_emit_timeout_when_execution_deadline_not_reached(

Review Comment:
   I am not sure what purpose this test serves? Is the premise tautological 
i.e. timeout should not be emitted if the deadline is not reached? 



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py:
##########
@@ -133,8 +134,34 @@ def test_serialize(self, trigger):
             "last_log_time": None,
             "logging_interval": None,
             "trigger_kwargs": {},
+            "execution_deadline": None,
         }
 
+    def test_serialize_with_execution_deadline(self):
+        """``execution_deadline`` round-trips through serialization."""
+        deadline = 1_700_000_000.0
+        trigger = KubernetesPodTrigger(
+            pod_name=POD_NAME,
+            pod_namespace=NAMESPACE,
+            base_container_name=BASE_CONTAINER_NAME,
+            kubernetes_conn_id=CONN_ID,
+            poll_interval=POLL_INTERVAL,
+            cluster_context=CLUSTER_CONTEXT,
+            config_dict=CONFIG_DICT,
+            in_cluster=IN_CLUSTER,
+            get_logs=GET_LOGS,
+            startup_timeout=STARTUP_TIMEOUT_SECS,
+            startup_check_interval=STARTUP_CHECK_INTERVAL_SECS,
+            schedule_timeout=STARTUP_TIMEOUT_SECS,
+            trigger_start_time=TRIGGER_START_TIME,
+            on_finish_action=ON_FINISH_ACTION,
+            execution_deadline=deadline,
+        )
+
+        _, kwargs_dict = trigger.serialize()
+
+        assert kwargs_dict["execution_deadline"] == deadline

Review Comment:
   I don't think this extra test is needed as you have already added the new 
parameter to the existing test above.



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py:
##########
@@ -263,6 +290,139 @@ async def test_run_loop_return_failed_event(self, 
mock_hook, mock_method, mock_w
 
         assert actual_event == expected_event
 
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.hook")
+    async def 
test_run_loop_emits_timeout_event_when_execution_deadline_reached(
+        self, mock_hook, mock_define_state, mock_wait_pod
+    ):
+        """
+        When ``execution_deadline`` (the operator's translation of
+        ``execution_timeout``) has already passed before the trigger picks
+        the task up, the trigger short-circuits at the top of ``run()`` and
+        emits a ``timeout`` event immediately instead of starting the wait
+        chain. The operator's existing terminal-event path then fails the
+        task and runs ``on_finish_action`` (pod delete).
+        """

Review Comment:
   This docstring could be more concise. Please see the below:
   
   ```
   """
   When the execution deadline has already elapsed before the
   trigger starts polling, the trigger immediately emits a
   timeout event instead of entering the polling loop.
   """
   ```



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -949,7 +973,7 @@ def invoke_defer_method(
                 },
             )
         else:
-            self.defer(trigger=trigger, method_name="trigger_reentry")
+            self.defer(trigger=trigger, method_name="trigger_reentry", 
timeout=defer_timeout)

Review Comment:
   I am just curious if you have tested your implementation against a live 
cluster. What happens when you pass an `execution_timeout` of 30s and:
   
   1) Keep default `poll_interval` i.e. 2s.
   2) Set `poll_interval` to 60s.
   
   Is the pod cleaned up? Because passing a `non-None` argument for timeout 
parameter resulted in the the Trigger timing out before cleanup could happen. I 
understand you are implementing a 60s floor for the `defer_timeout` but I am 
not sure if this will completely mitigate the issue.



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py:
##########
@@ -2655,6 +2656,148 @@ def test_async_create_pod_should_throw_exception(self, 
mocked_hook, mocked_clean
         log_message = "Trigger emitted an %s event, failing the task: %s"
         mocked_log.error.assert_called_once_with(log_message, status, message)
 
+    @patch(KUB_OP_PATH.format("convert_config_file_to_dict"))
+    
@patch("airflow.providers.cncf.kubernetes.operators.pod.BaseHook.get_connection")
+    def 
test_invoke_defer_method_passes_execution_deadline_when_execution_timeout_set(
+        self, mocked_get_connection, mocked_convert_config
+    ):
+        """
+        ``execution_timeout`` is translated into an absolute 
``execution_deadline``
+        plumbed through to the trigger, and ``defer.timeout`` is set to the
+        remaining budget so the framework's ``trigger_timeout`` bounds the
+        trigger's lifetime as a backstop. Anchoring on ``ti.start_date`` keeps
+        the deadline stable across re-deferrals.
+        """
+        mocked_get_connection.side_effect = 
AirflowNotFoundException("connection not found")
+
+        execution_timeout = datetime.timedelta(seconds=300)
+        k = KubernetesPodOperator(
+            task_id=TEST_TASK_ID,
+            namespace=TEST_NAMESPACE,
+            image=TEST_IMAGE,
+            name=TEST_NAME,
+            on_finish_action="keep_pod",
+            in_cluster=True,
+            deferrable=True,
+            execution_timeout=execution_timeout,
+        )
+        # Skip the pod-creation path and pretend it's already running.
+        k.pod = MagicMock()
+        k.pod.metadata.name = TEST_NAME
+        k.pod.metadata.namespace = TEST_NAMESPACE
+
+        ti_mock = MagicMock()
+        ti_start = datetime.datetime(2026, 1, 1, 12, 0, 0, 
tzinfo=datetime.timezone.utc)
+        ti_mock.start_date = ti_start
+        context = {"ti": ti_mock}
+
+        # Freeze time at ``ti_start + 30s``. With ``execution_timeout=300s`` 
the
+        # remaining budget is exactly ``270s``, so ``defer.timeout`` must equal
+        # that — not the 60s minimum-buffer clamp.
+        elapsed = datetime.timedelta(seconds=30)
+        with time_machine.travel(ti_start + elapsed, tick=False):
+            with patch(
+                f"{TRIGGER_CLASS}.define_pod_container_state",
+                return_value=ContainerState.RUNNING,
+            ):
+                with pytest.raises(TaskDeferred) as exc:
+                    k.invoke_defer_method(context=context)

Review Comment:
   Is it possible if you could flatten these nested `with` blocks? Same applies 
to the tests below.



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py:
##########
@@ -263,6 +290,139 @@ async def test_run_loop_return_failed_event(self, 
mock_hook, mock_method, mock_w
 
         assert actual_event == expected_event
 
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.hook")
+    async def 
test_run_loop_emits_timeout_event_when_execution_deadline_reached(
+        self, mock_hook, mock_define_state, mock_wait_pod
+    ):
+        """
+        When ``execution_deadline`` (the operator's translation of
+        ``execution_timeout``) has already passed before the trigger picks
+        the task up, the trigger short-circuits at the top of ``run()`` and
+        emits a ``timeout`` event immediately instead of starting the wait
+        chain. The operator's existing terminal-event path then fails the
+        task and runs ``on_finish_action`` (pod delete).
+        """
+        # Already-past deadline → ``run()`` short-circuit trips it.
+        past_deadline = 1
+        trigger_with_deadline = KubernetesPodTrigger(
+            pod_name=POD_NAME,
+            pod_namespace=NAMESPACE,
+            base_container_name=BASE_CONTAINER_NAME,
+            kubernetes_conn_id=CONN_ID,
+            poll_interval=POLL_INTERVAL,
+            cluster_context=CLUSTER_CONTEXT,
+            config_dict=CONFIG_DICT,
+            in_cluster=IN_CLUSTER,
+            get_logs=GET_LOGS,
+            startup_timeout=STARTUP_TIMEOUT_SECS,
+            startup_check_interval=STARTUP_CHECK_INTERVAL_SECS,
+            schedule_timeout=STARTUP_TIMEOUT_SECS,
+            trigger_start_time=TRIGGER_START_TIME,
+            on_finish_action=ON_FINISH_ACTION,
+            execution_deadline=past_deadline,
+        )
+        # If the short-circuit fails, ``_wait_for_pod_start`` would be called
+        # next; making it RUNNING ensures the test fails loudly rather than
+        # accidentally emitting some other terminal event.
+        mock_wait_pod.return_value = ContainerState.RUNNING
+        mock_define_state.return_value = ContainerState.RUNNING
+        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.AsyncMock())
+
+        actual_event = await trigger_with_deadline.run().asend(None)
+
+        # ``last_log_time`` is intentionally absent from this short-circuit and
+        # from the ``PodLaunchTimeoutException`` handler — both fire before any
+        # log fetching has happened. The mid-poll deadline check (in
+        # ``_wait_for_container_completion``) is the only timeout site that
+        # carries ``last_log_time`` because that's where it can be meaningfully
+        # populated.
+        assert actual_event.payload["status"] == "timeout"
+        assert actual_event.payload["namespace"] == NAMESPACE
+        assert actual_event.payload["name"] == POD_NAME
+        assert "execution_timeout" in actual_event.payload["message"]
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.hook")
+    async def 
test_run_loop_does_not_emit_timeout_when_execution_deadline_not_reached(
+        self, mock_hook, mock_define_state, mock_wait_pod
+    ):
+        """
+        When ``execution_deadline`` is still in the future, the trigger keeps
+        polling normally — proves the deadline check doesn't fire preemptively
+        on every run.
+        """
+        # Far-future deadline (~year 2286) — guaranteed not reached.
+        future_deadline = 9_999_999_999
+        trigger_with_deadline = KubernetesPodTrigger(
+            pod_name=POD_NAME,
+            pod_namespace=NAMESPACE,
+            base_container_name=BASE_CONTAINER_NAME,
+            kubernetes_conn_id=CONN_ID,
+            poll_interval=POLL_INTERVAL,
+            cluster_context=CLUSTER_CONTEXT,
+            config_dict=CONFIG_DICT,
+            in_cluster=IN_CLUSTER,
+            get_logs=GET_LOGS,
+            startup_timeout=STARTUP_TIMEOUT_SECS,
+            startup_check_interval=STARTUP_CHECK_INTERVAL_SECS,
+            schedule_timeout=STARTUP_TIMEOUT_SECS,
+            trigger_start_time=TRIGGER_START_TIME,
+            on_finish_action=ON_FINISH_ACTION,
+            execution_deadline=future_deadline,
+        )
+        mock_wait_pod.return_value = ContainerState.RUNNING
+        mock_define_state.return_value = ContainerState.RUNNING
+        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.AsyncMock())
+
+        # Trigger must keep waiting (not emit any event yet). Use 
``asyncio.wait_for``
+        # rather than a real ``asyncio.sleep`` to avoid wall-clock cost / 
flake risk.
+        with pytest.raises(asyncio.TimeoutError):
+            await asyncio.wait_for(trigger_with_deadline.run().__anext__(), 
timeout=0.05)
+
+    @pytest.mark.asyncio
+    async def 
test_run_emits_timeout_when_deadline_passed_during_pod_startup(self):
+        """
+        ``execution_deadline`` enforcement must cover the pod startup phase 
too,
+        not just ``_wait_for_container_completion``. If the deadline elapses
+        while the pod is still Pending, the trigger must emit a ``timeout``
+        event rather than waiting out the (potentially much longer)
+        ``startup_timeout``.
+        """

Review Comment:
   This can be more concise as well. Please see the below:
   
   ```
   """
   The trigger emits a timeout event if the execution deadline
   expires during pod startup.
   """
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to