This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 981ba8f005 Fixes KubernetesPodTrigger failing running pods with 
timeout (#40019)
981ba8f005 is described below

commit 981ba8f00544597863448f1b951cdd8c5bc6a023
Author: Usiel Riedl <[email protected]>
AuthorDate: Mon Jun 3 19:13:26 2024 +0800

    Fixes KubernetesPodTrigger failing running pods with timeout (#40019)
    
    Fixes: https://github.com/apache/airflow/issues/40018
    
    See linked issue for a detailed explanation. The fix in this commit makes 
sure we always check the pod state before checking the trigger timeout.
    This may in some cases add an unnecessary call to the K8s API (trigger 
timed out and pod is in fact still pending), but I think that is preferable
    to the alternative where we fail a task even though the pod completed 
successfully (most likely an expensive execution since `deferrable=True`).
---
 airflow/providers/cncf/kubernetes/triggers/pod.py  | 10 +++---
 .../providers/cncf/kubernetes/triggers/test_pod.py | 37 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py 
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index b74e3ef877..5fabce5001 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -219,15 +219,17 @@ class KubernetesPodTrigger(BaseTrigger):
 
     async def _wait_for_pod_start(self) -> ContainerState:
         """Loops until pod phase leaves ``PENDING`` If timeout is reached, 
throws error."""
-        delta = datetime.datetime.now(tz=datetime.timezone.utc) - 
self.trigger_start_time
-        while self.startup_timeout >= delta.total_seconds():
+        while True:
             pod = await self.hook.get_pod(self.pod_name, self.pod_namespace)
             if not pod.status.phase == "Pending":
                 return self.define_container_state(pod)
+
+            delta = datetime.datetime.now(tz=datetime.timezone.utc) - 
self.trigger_start_time
+            if self.startup_timeout < delta.total_seconds():
+                raise PodLaunchTimeoutException("Pod did not leave 'Pending' 
phase within specified timeout")
+
             self.log.info("Still waiting for pod to start. The pod state is 
%s", pod.status.phase)
             await asyncio.sleep(self.startup_check_interval)
-            delta = datetime.datetime.now(tz=datetime.timezone.utc) - 
self.trigger_start_time
-        raise PodLaunchTimeoutException("Pod did not leave 'Pending' phase 
within specified timeout")
 
     async def _wait_for_container_completion(self) -> TriggerEvent:
         """
diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py 
b/tests/providers/cncf/kubernetes/triggers/test_pod.py
index 6572f03414..610db56c75 100644
--- a/tests/providers/cncf/kubernetes/triggers/test_pod.py
+++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py
@@ -350,3 +350,40 @@ class TestKubernetesPodTrigger:
             )
             == actual
         )
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.hook")
+    async def test_run_loop_return_success_for_completed_pod_after_timeout(
+        self, mock_hook, mock_method, trigger, caplog
+    ):
+        """
+        Test that the trigger correctly recognizes the pod is not pending even 
after the timeout has been
+        reached. This may happen when a new triggerer process takes over the 
trigger, the pod already left
+        pending state and the timeout has been reached.
+        """
+        trigger.trigger_start_time = TRIGGER_START_TIME - 
datetime.timedelta(minutes=2)
+        mock_hook.get_pod.return_value = self._mock_pod_result(
+            mock.MagicMock(
+                status=mock.MagicMock(
+                    phase=PodPhase.SUCCEEDED,
+                )
+            )
+        )
+        mock_method.return_value = ContainerState.TERMINATED
+
+        caplog.set_level(logging.INFO)
+
+        generator = trigger.run()
+        actual = await generator.asend(None)
+        assert (
+            TriggerEvent(
+                {
+                    "name": POD_NAME,
+                    "namespace": NAMESPACE,
+                    "message": "All containers inside pod have started 
successfully.",
+                    "status": "success",
+                }
+            )
+            == actual
+        )

Reply via email to