This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bd512007e5 Fix KubernetesPodTrigger startup timeout (#34579)
bd512007e5 is described below

commit bd512007e531bb58e86f8c1b8f84ac20e8e92d7c
Author: Dmitry Zhyhimont <[email protected]>
AuthorDate: Thu Sep 28 20:19:14 2023 +0300

    Fix KubernetesPodTrigger startup timeout (#34579)
---
 airflow/providers/cncf/kubernetes/operators/pod.py |  5 +++-
 airflow/providers/cncf/kubernetes/triggers/pod.py  |  2 +-
 .../cncf/kubernetes/operators/test_pod.py          | 24 +++++++++++++++
 .../providers/cncf/kubernetes/triggers/test_pod.py | 35 ++++++++++++++++++++++
 4 files changed, 64 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 4b9e432455..efb51366fb 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -679,7 +679,10 @@ class KubernetesPodOperator(BaseOperator):
                     return xcom_sidecar_output
         finally:
             istio_enabled = self.is_istio_enabled(pod)
-            pod = self.pod_manager.await_pod_completion(pod, istio_enabled, 
self.base_container_name)
+            # Skip await_pod_completion when the event is 'timeout' due to the 
pod can hang
+            # on the ErrImagePull or ContainerCreating step and it will never 
complete
+            if event["status"] != "timeout":
+                pod = self.pod_manager.await_pod_completion(pod, 
istio_enabled, self.base_container_name)
             if pod is not None:
                 self.post_complete_action(
                     pod=pod,
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py 
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index b7e7d7b818..a4e7528ee6 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -166,7 +166,7 @@ class KubernetesPodTrigger(BaseTrigger):
                 elif self.should_wait(pod_phase=pod_status, 
container_state=container_state):
                     self.log.info("Container is not completed and still 
working.")
 
-                    if pod_status == PodPhase.PENDING and container_state == 
ContainerState.UNDEFINED:
+                    if pod_status == PodPhase.PENDING and container_state != 
ContainerState.RUNNING:
                         delta = 
datetime.datetime.now(tz=datetime.timezone.utc) - self.trigger_start_time
                         if delta.total_seconds() >= self.startup_timeout:
                             message = (
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py 
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index ec107512a5..2af333dcff 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -1841,3 +1841,27 @@ def test_default_container_logs():
 
     k = TestSubclassKPO(task_id="task")
     assert k.container_logs == "test-base-container"
+
+
+@patch(KUB_OP_PATH.format("post_complete_action"))
+@patch(HOOK_CLASS)
+@patch(KUB_OP_PATH.format("pod_manager"))
+def test_async_skip_kpo_wait_termination_with_timeout_event(mock_manager, 
mocked_hook, post_complete_action):
+    metadata = {"metadata.name": TEST_NAME, "metadata.namespace": 
TEST_NAMESPACE}
+    pending_state = mock.MagicMock(**metadata, **{"status.phase": "Pending"})
+    mocked_hook.return_value.get_pod.return_value = pending_state
+    ti_mock = MagicMock()
+
+    event = {"status": "timeout", "message": "timeout", "name": TEST_NAME, 
"namespace": TEST_NAMESPACE}
+
+    k = KubernetesPodOperator(task_id="task", deferrable=True)
+
+    # assert that the AirflowException is raised when the timeout event is 
present
+    with pytest.raises(AirflowException):
+        k.execute_complete({"ti": ti_mock}, event)
+
+    # assert that the await_pod_completion is not called
+    mock_manager.await_pod_completion.assert_not_called()
+
+    # assert that the cleanup is called
+    post_complete_action.assert_called_once()
diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py 
b/tests/providers/cncf/kubernetes/triggers/test_pod.py
index 5719dcefca..88b26aa585 100644
--- a/tests/providers/cncf/kubernetes/triggers/test_pod.py
+++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py
@@ -27,6 +27,7 @@ import pytest
 from kubernetes.client import models as k8s
 
 from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState, 
KubernetesPodTrigger
+from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase
 from airflow.triggers.base import TriggerEvent
 
 TRIGGER_PATH = 
"airflow.providers.cncf.kubernetes.triggers.pod.KubernetesPodTrigger"
@@ -334,3 +335,37 @@ class TestKubernetesPodTrigger:
         )
 
         assert expected_state == trigger.define_container_state(pod)
+
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize("container_state", [ContainerState.WAITING, 
ContainerState.UNDEFINED])
+    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}._get_async_hook")
+    async def test_run_loop_return_timeout_event(
+        self, mock_hook, mock_method, trigger, caplog, container_state
+    ):
+        trigger.trigger_start_time = TRIGGER_START_TIME - 
datetime.timedelta(minutes=2)
+        mock_hook.return_value.get_pod.return_value = self._mock_pod_result(
+            mock.MagicMock(
+                status=mock.MagicMock(
+                    phase=PodPhase.PENDING,
+                )
+            )
+        )
+        mock_method.return_value = container_state
+
+        caplog.set_level(logging.INFO)
+
+        generator = trigger.run()
+        actual = await generator.asend(None)
+        assert (
+            TriggerEvent(
+                {
+                    "name": POD_NAME,
+                    "namespace": NAMESPACE,
+                    "status": "timeout",
+                    "message": f"Pod took longer than {STARTUP_TIMEOUT_SECS} 
seconds to start."
+                    " Check the pod events in kubernetes to determine why.",
+                }
+            )
+            == actual
+        )

Reply via email to