This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bd512007e5 Fix KubernetesPodTrigger startup timeout (#34579)
bd512007e5 is described below
commit bd512007e531bb58e86f8c1b8f84ac20e8e92d7c
Author: Dmitry Zhyhimont <[email protected]>
AuthorDate: Thu Sep 28 20:19:14 2023 +0300
Fix KubernetesPodTrigger startup timeout (#34579)
---
airflow/providers/cncf/kubernetes/operators/pod.py | 5 +++-
airflow/providers/cncf/kubernetes/triggers/pod.py | 2 +-
.../cncf/kubernetes/operators/test_pod.py | 24 +++++++++++++++
.../providers/cncf/kubernetes/triggers/test_pod.py | 35 ++++++++++++++++++++++
4 files changed, 64 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 4b9e432455..efb51366fb 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -679,7 +679,10 @@ class KubernetesPodOperator(BaseOperator):
return xcom_sidecar_output
finally:
istio_enabled = self.is_istio_enabled(pod)
- pod = self.pod_manager.await_pod_completion(pod, istio_enabled,
self.base_container_name)
+ # Skip await_pod_completion when the event is 'timeout' due to the
pod can hang
+ # on the ErrImagePull or ContainerCreating step and it will never
complete
+ if event["status"] != "timeout":
+ pod = self.pod_manager.await_pod_completion(pod,
istio_enabled, self.base_container_name)
if pod is not None:
self.post_complete_action(
pod=pod,
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index b7e7d7b818..a4e7528ee6 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -166,7 +166,7 @@ class KubernetesPodTrigger(BaseTrigger):
elif self.should_wait(pod_phase=pod_status,
container_state=container_state):
self.log.info("Container is not completed and still
working.")
- if pod_status == PodPhase.PENDING and container_state ==
ContainerState.UNDEFINED:
+ if pod_status == PodPhase.PENDING and container_state !=
ContainerState.RUNNING:
delta =
datetime.datetime.now(tz=datetime.timezone.utc) - self.trigger_start_time
if delta.total_seconds() >= self.startup_timeout:
message = (
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index ec107512a5..2af333dcff 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -1841,3 +1841,27 @@ def test_default_container_logs():
k = TestSubclassKPO(task_id="task")
assert k.container_logs == "test-base-container"
+
+
+@patch(KUB_OP_PATH.format("post_complete_action"))
+@patch(HOOK_CLASS)
+@patch(KUB_OP_PATH.format("pod_manager"))
+def test_async_skip_kpo_wait_termination_with_timeout_event(mock_manager,
mocked_hook, post_complete_action):
+ metadata = {"metadata.name": TEST_NAME, "metadata.namespace":
TEST_NAMESPACE}
+ pending_state = mock.MagicMock(**metadata, **{"status.phase": "Pending"})
+ mocked_hook.return_value.get_pod.return_value = pending_state
+ ti_mock = MagicMock()
+
+ event = {"status": "timeout", "message": "timeout", "name": TEST_NAME,
"namespace": TEST_NAMESPACE}
+
+ k = KubernetesPodOperator(task_id="task", deferrable=True)
+
+ # assert that the AirflowException is raised when the timeout event is
present
+ with pytest.raises(AirflowException):
+ k.execute_complete({"ti": ti_mock}, event)
+
+ # assert that the await_pod_completion is not called
+ mock_manager.await_pod_completion.assert_not_called()
+
+ # assert that the cleanup is called
+ post_complete_action.assert_called_once()
diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py
b/tests/providers/cncf/kubernetes/triggers/test_pod.py
index 5719dcefca..88b26aa585 100644
--- a/tests/providers/cncf/kubernetes/triggers/test_pod.py
+++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py
@@ -27,6 +27,7 @@ import pytest
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState,
KubernetesPodTrigger
+from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase
from airflow.triggers.base import TriggerEvent
TRIGGER_PATH =
"airflow.providers.cncf.kubernetes.triggers.pod.KubernetesPodTrigger"
@@ -334,3 +335,37 @@ class TestKubernetesPodTrigger:
)
assert expected_state == trigger.define_container_state(pod)
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize("container_state", [ContainerState.WAITING,
ContainerState.UNDEFINED])
+ @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+ @mock.patch(f"{TRIGGER_PATH}._get_async_hook")
+ async def test_run_loop_return_timeout_event(
+ self, mock_hook, mock_method, trigger, caplog, container_state
+ ):
+ trigger.trigger_start_time = TRIGGER_START_TIME -
datetime.timedelta(minutes=2)
+ mock_hook.return_value.get_pod.return_value = self._mock_pod_result(
+ mock.MagicMock(
+ status=mock.MagicMock(
+ phase=PodPhase.PENDING,
+ )
+ )
+ )
+ mock_method.return_value = container_state
+
+ caplog.set_level(logging.INFO)
+
+ generator = trigger.run()
+ actual = await generator.asend(None)
+ assert (
+ TriggerEvent(
+ {
+ "name": POD_NAME,
+ "namespace": NAMESPACE,
+ "status": "timeout",
+ "message": f"Pod took longer than {STARTUP_TIMEOUT_SECS}
seconds to start."
+ " Check the pod events in kubernetes to determine why.",
+ }
+ )
+ == actual
+ )