This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f5de83ee6 Fix `KubernetesPodTrigger` waiting strategy (#31348)
8f5de83ee6 is described below

commit 8f5de83ee68c28100efc085add40ae4702bc3de1
Author: Hussein Awala <[email protected]>
AuthorDate: Thu Jun 29 16:55:41 2023 +0200

    Fix `KubernetesPodTrigger` waiting strategy (#31348)
---
 airflow/providers/cncf/kubernetes/triggers/pod.py  | 26 ++++++++++++------
 .../providers/cncf/kubernetes/triggers/test_pod.py | 32 +++++++++++++++++++++-
 .../cloud/triggers/test_kubernetes_engine.py       | 12 ++++++--
 3 files changed, 58 insertions(+), 12 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py 
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index e2b5edd8e4..6cfb6c523a 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -134,15 +134,23 @@ class KubernetesPodTrigger(BaseTrigger):
                 self.log.debug("Container %s status: %s", 
self.base_container_name, container_state)
 
                 if container_state == ContainerState.TERMINATED:
-                    yield TriggerEvent(
-                        {
-                            "name": self.pod_name,
-                            "namespace": self.pod_namespace,
-                            "status": "success",
-                            "message": "All containers inside pod have started 
successfully.",
-                        }
-                    )
-                    return
+                    if pod_status not in PodPhase.terminal_states:
+                        self.log.info(
+                            "Pod %s is still running. Sleeping for %s 
seconds.",
+                            self.pod_name,
+                            self.poll_interval,
+                        )
+                        await asyncio.sleep(self.poll_interval)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "name": self.pod_name,
+                                "namespace": self.pod_namespace,
+                                "status": "success",
+                                "message": "All containers inside pod have 
started successfully.",
+                            }
+                        )
+                        return
                 elif self.should_wait(pod_phase=pod_status, 
container_state=container_state):
                     self.log.info("Container is not completed and still 
working.")
 
diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py 
b/tests/providers/cncf/kubernetes/triggers/test_pod.py
index 28159ca5ff..6d5d18d028 100644
--- a/tests/providers/cncf/kubernetes/triggers/test_pod.py
+++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py
@@ -95,7 +95,8 @@ class TestKubernetesPodTrigger:
     @mock.patch(f"{TRIGGER_PATH}.define_container_state")
     @mock.patch(f"{TRIGGER_PATH}._get_async_hook")
     async def test_run_loop_return_success_event(self, mock_hook, mock_method, 
trigger):
-        mock_hook.return_value.get_pod.return_value = 
self._mock_pod_result(mock.MagicMock())
+        pod_mock = mock.MagicMock(**{"status.phase": "Succeeded"})
+        mock_hook.return_value.get_pod.return_value = 
self._mock_pod_result(pod_mock)
         mock_method.return_value = ContainerState.TERMINATED
 
         expected_event = TriggerEvent(
@@ -110,6 +111,35 @@ class TestKubernetesPodTrigger:
 
         assert actual_event == expected_event
 
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}._get_async_hook")
+    async def 
test_run_loop_wait_pod_termination_before_returning_success_event(
+        self, mock_hook, mock_method, trigger
+    ):
+        running_state = mock.MagicMock(**{"status.phase": "Running"})
+        succeeded_state = mock.MagicMock(**{"status.phase": "Succeeded"})
+        mock_hook.return_value.get_pod.side_effect = [
+            self._mock_pod_result(running_state),
+            self._mock_pod_result(running_state),
+            self._mock_pod_result(succeeded_state),
+        ]
+        mock_method.return_value = ContainerState.TERMINATED
+
+        expected_event = TriggerEvent(
+            {
+                "name": POD_NAME,
+                "namespace": NAMESPACE,
+                "status": "success",
+                "message": "All containers inside pod have started 
successfully.",
+            }
+        )
+        with mock.patch.object(asyncio, "sleep") as mock_sleep:
+            actual_event = await (trigger.run()).asend(None)
+
+        assert actual_event == expected_event
+        assert mock_sleep.call_count == 2
+
     @pytest.mark.asyncio
     @mock.patch(f"{TRIGGER_PATH}.define_container_state")
     @mock.patch(f"{TRIGGER_PATH}._get_async_hook")
diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py 
b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
index 3be05c3b1b..e957767e3e 100644
--- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
@@ -108,7 +108,13 @@ class TestGKEStartPodTrigger:
     async def test_run_loop_return_success_event_should_execute_successfully(
         self, mock_hook, mock_method, trigger
     ):
-        mock_hook.return_value.get_pod.return_value = 
self._mock_pod_result(mock.MagicMock())
+        running_state = mock.MagicMock(**{"status.phase": "Running"})
+        succeeded_state = mock.MagicMock(**{"status.phase": "Succeeded"})
+        mock_hook.return_value.get_pod.side_effect = [
+            self._mock_pod_result(running_state),
+            self._mock_pod_result(running_state),
+            self._mock_pod_result(succeeded_state),
+        ]
         mock_method.return_value = ContainerState.TERMINATED
 
         expected_event = TriggerEvent(
@@ -119,9 +125,11 @@ class TestGKEStartPodTrigger:
                 "message": "All containers inside pod have started 
successfully.",
             }
         )
-        actual_event = await (trigger.run()).asend(None)
+        with mock.patch.object(asyncio, "sleep") as mock_sleep:
+            actual_event = await (trigger.run()).asend(None)
 
         assert actual_event == expected_event
+        assert mock_sleep.call_count == 2
 
     @pytest.mark.asyncio
     @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")

Reply via email to