This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bccbcbf7a15 Fix KubernetesPodTrigger pod terminal state handling
(#66650)
bccbcbf7a15 is described below
commit bccbcbf7a15c1976242bf247187172deceb6c435
Author: Anmol Mishra <[email protected]>
AuthorDate: Sun May 10 23:33:17 2026 +0530
Fix KubernetesPodTrigger pod terminal state handling (#66650)
* Fix KubernetesPodTrigger pod terminal state handling
* Address pod trigger review feedback
* Restore pod trigger log wording
---------
Co-authored-by: Anmol Mishra <[email protected]>
---
.../providers/cncf/kubernetes/operators/pod.py | 8 +-
.../providers/cncf/kubernetes/triggers/pod.py | 21 +++-
.../unit/cncf/kubernetes/operators/test_pod.py | 4 +-
.../unit/cncf/kubernetes/triggers/test_pod.py | 107 +++++++++++++++++++--
4 files changed, 120 insertions(+), 20 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index cead8187997..c28d1c9b67f 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -925,19 +925,19 @@ class KubernetesPodOperator(BaseOperator):
logging_interval=self.logging_interval,
trigger_kwargs=self.trigger_kwargs,
)
- container_state = trigger.define_container_state(self.pod) if self.pod
else None
+ pod_container_state = trigger.define_pod_container_state(self.pod) if
self.pod else None
if context and (
- container_state == ContainerState.TERMINATED or container_state ==
ContainerState.FAILED
+ pod_container_state == ContainerState.TERMINATED or
pod_container_state == ContainerState.FAILED
):
self.log.info("Skipping deferral as pod is already in a terminal
state")
self.trigger_reentry(
context=context,
event={
- "status": "failed" if container_state ==
ContainerState.FAILED else "success",
+ "status": "failed" if pod_container_state ==
ContainerState.FAILED else "success",
"namespace": trigger.pod_namespace,
"name": trigger.pod_name,
"message": "Container failed"
- if container_state == ContainerState.FAILED
+ if pod_container_state == ContainerState.FAILED
else "Container succeeded",
"last_log_time": last_log_time,
**(self.trigger_kwargs or {}),
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index fc9ec300a0c..e801a2f0566 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -292,7 +292,7 @@ class KubernetesPodTrigger(BaseTrigger):
with contextlib.suppress(asyncio.CancelledError):
await events_task
- return self.define_container_state(await self._get_pod())
+ return self.define_pod_container_state(await self._get_pod())
async def _wait_for_container_completion(self) -> TriggerEvent:
"""
@@ -307,8 +307,8 @@ class KubernetesPodTrigger(BaseTrigger):
time_get_more_logs = time_begin +
datetime.timedelta(seconds=self.logging_interval)
while True:
pod = await self._get_pod()
- container_state = self.define_container_state(pod)
- if container_state == ContainerState.TERMINATED:
+ pod_container_state = self.define_pod_container_state(pod)
+ if pod_container_state == ContainerState.TERMINATED:
return TriggerEvent(
{
"status": "success",
@@ -318,7 +318,7 @@ class KubernetesPodTrigger(BaseTrigger):
**self.trigger_kwargs,
}
)
- if container_state == ContainerState.FAILED:
+ if pod_container_state == ContainerState.FAILED:
return TriggerEvent(
{
"status": "failed",
@@ -508,6 +508,19 @@ class KubernetesPodTrigger(BaseTrigger):
return ContainerState.TERMINATED if state_obj.exit_code == 0
else ContainerState.FAILED
return ContainerState.UNDEFINED
+ def define_pod_container_state(self, pod: V1Pod) -> ContainerState:
+ """Infer workload state from terminal pod phase first, then from the
base container state."""
+ if pod.status is None:
+ return ContainerState.UNDEFINED
+
+ if pod.status.phase == PodPhase.SUCCEEDED:
+ return ContainerState.TERMINATED
+
+ if pod.status.phase == PodPhase.FAILED:
+ return ContainerState.FAILED
+
+ return self.define_container_state(pod)
+
@staticmethod
def should_wait(pod_phase: PodPhase, container_state: ContainerState) ->
bool:
return (
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index 5f8da532fe1..ad3013a0a0e 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -2529,10 +2529,10 @@ class TestKubernetesPodOperatorAsync:
@patch(KUB_OP_PATH.format("build_pod_request_obj"))
@patch(KUB_OP_PATH.format("get_or_create_pod"))
@patch("airflow.providers.cncf.kubernetes.operators.pod.BaseHook.get_connection")
- @patch(f"{TRIGGER_CLASS}.define_container_state")
+ @patch(f"{TRIGGER_CLASS}.define_pod_container_state")
def test_async_create_pod_should_execute_successfully(
self,
- mocked_container_state,
+ mocked_pod_container_state,
mocked_get_connection,
mocked_pod,
mocked_pod_obj,
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
index bb0b00662ef..cc35bd4db47 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
@@ -200,7 +200,7 @@ class TestKubernetesPodTrigger:
@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
- @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+ @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
@mock.patch(f"{TRIGGER_PATH}.hook")
async def test_run_loop_return_waiting_event(
self, mock_hook, mock_method, mock_wait_pod, trigger, caplog
@@ -219,7 +219,7 @@ class TestKubernetesPodTrigger:
@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
- @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+ @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
@mock.patch(f"{TRIGGER_PATH}.hook")
async def test_run_loop_return_running_event(
self, mock_hook, mock_method, mock_wait_pod, trigger, caplog
@@ -238,7 +238,7 @@ class TestKubernetesPodTrigger:
@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
- @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+ @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
@mock.patch(f"{TRIGGER_PATH}.hook")
async def test_run_loop_return_failed_event(self, mock_hook, mock_method,
mock_wait_pod, trigger):
mock_hook.get_pod.return_value = self._mock_pod_result(
@@ -287,7 +287,7 @@ class TestKubernetesPodTrigger:
assert actual_stack_trace.startswith("Traceback (most recent call
last):")
@pytest.mark.asyncio
- @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+ @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
@mock.patch(f"{TRIGGER_PATH}.hook")
async def test_logging_in_trigger_when_fail_should_execute_successfully(
self, mock_hook, mock_method, trigger, caplog
@@ -321,7 +321,7 @@ class TestKubernetesPodTrigger:
],
)
@mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.datetime")
- @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+ @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
@mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
@mock.patch(
"airflow.providers.cncf.kubernetes.triggers.pod.AsyncPodManager.fetch_container_logs_before_current_sec"
@@ -332,7 +332,7 @@ class TestKubernetesPodTrigger:
mock_get_pod,
mock_fetch_container_logs_before_current_sec,
mock_wait_pod,
- define_container_state,
+ define_pod_container_state,
mock_datetime,
logging_interval,
exp_event,
@@ -354,7 +354,7 @@ class TestKubernetesPodTrigger:
return DateTime(2022, 1, 1)
mock_fetch_container_logs_before_current_sec.side_effect =
async_datetime_return
- define_container_state.side_effect = ["running", "running",
"terminated"]
+ define_pod_container_state.side_effect = ["running", "running",
"terminated"]
trigger = KubernetesPodTrigger(
pod_name=POD_NAME,
pod_namespace=NAMESPACE,
@@ -406,8 +406,95 @@ class TestKubernetesPodTrigger:
assert expected_state == trigger.define_container_state(pod)
+ def
test_define_pod_container_state_returns_failed_when_init_container_fails(self,
trigger):
+ pod = k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(name="base", namespace="default"),
+ status=k8s.V1PodStatus(
+ phase=PodPhase.FAILED,
+ init_container_statuses=[
+ k8s.V1ContainerStatus(
+ name="init",
+ image="alpine",
+ image_id="1",
+ ready=False,
+ restart_count=0,
+
state=k8s.V1ContainerState(terminated=k8s.V1ContainerStateTerminated(exit_code=1)),
+ )
+ ],
+ container_statuses=[
+ k8s.V1ContainerStatus(
+ name="base",
+ image="alpine",
+ image_id="1",
+ ready=False,
+ restart_count=0,
+ state=k8s.V1ContainerState(
+
waiting=k8s.V1ContainerStateWaiting(reason="PodInitializing")
+ ),
+ )
+ ],
+ ),
+ )
+
+ assert trigger.define_pod_container_state(pod) == ContainerState.FAILED
+
+ def
test_define_pod_container_state_returns_terminated_when_pod_succeeds(self,
trigger):
+ pod = k8s.V1Pod(status=k8s.V1PodStatus(phase=PodPhase.SUCCEEDED))
+
+ assert trigger.define_pod_container_state(pod) ==
ContainerState.TERMINATED
+
+ def
test_define_pod_container_state_falls_back_to_base_container_state_for_non_terminal_pod(
+ self, trigger
+ ):
+ pod = k8s.V1Pod(status=k8s.V1PodStatus(phase=PodPhase.RUNNING))
+
+ with mock.patch.object(
+ trigger, "define_container_state",
return_value=ContainerState.RUNNING
+ ) as mock_define_container_state:
+ assert trigger.define_pod_container_state(pod) ==
ContainerState.RUNNING
+
+ mock_define_container_state.assert_called_once_with(pod)
+
+ @pytest.mark.asyncio
+ @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+ @mock.patch(f"{TRIGGER_PATH}.hook")
+ async def
test_run_loop_returns_failed_when_pod_failed_and_base_container_waits(
+ self, mock_hook, mock_wait_pod, trigger
+ ):
+ pod = k8s.V1Pod(
+ status=k8s.V1PodStatus(
+ phase=PodPhase.FAILED,
+ container_statuses=[
+ k8s.V1ContainerStatus(
+ name="base",
+ image="alpine",
+ image_id="1",
+ ready=False,
+ restart_count=0,
+ state=k8s.V1ContainerState(
+
waiting=k8s.V1ContainerStateWaiting(reason="PodInitializing")
+ ),
+ )
+ ],
+ )
+ )
+ mock_wait_pod.return_value = ContainerState.WAITING
+ mock_hook.get_pod.return_value = self._mock_pod_result(pod)
+
+ actual_event = await trigger.run().asend(None)
+
+ assert actual_event == TriggerEvent(
+ {
+ "status": "failed",
+ "namespace": "default",
+ "name": "test-pod-name",
+ "message": "Container state failed",
+ "last_log_time": None,
+ }
+ )
+
@pytest.mark.asyncio
- @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+ @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
@mock.patch(f"{TRIGGER_PATH}.hook")
async def test_run_loop_read_events_during_start(self, mock_hook,
mock_method, trigger):
event1 = mock.Mock()
@@ -455,7 +542,7 @@ class TestKubernetesPodTrigger:
@pytest.mark.asyncio
@pytest.mark.parametrize("container_state", [ContainerState.WAITING,
ContainerState.UNDEFINED])
- @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+ @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
@mock.patch(f"{TRIGGER_PATH}.hook")
async def test_run_loop_return_timeout_event(
self, mock_hook, mock_method, trigger, container_state,
mock_time_fixture
@@ -483,7 +570,7 @@ class TestKubernetesPodTrigger:
)
@pytest.mark.asyncio
- @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+ @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
@mock.patch(f"{TRIGGER_PATH}.hook")
async def test_run_loop_return_success_for_completed_pod_after_timeout(
self, mock_hook, mock_method, trigger, mock_time_fixture