This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bccbcbf7a15 Fix KubernetesPodTrigger pod terminal state handling 
(#66650)
bccbcbf7a15 is described below

commit bccbcbf7a15c1976242bf247187172deceb6c435
Author: Anmol Mishra <[email protected]>
AuthorDate: Sun May 10 23:33:17 2026 +0530

    Fix KubernetesPodTrigger pod terminal state handling (#66650)
    
    * Fix KubernetesPodTrigger pod terminal state handling
    
    * Address pod trigger review feedback
    
    * Restore pod trigger log wording
    
    ---------
    
    Co-authored-by: Anmol Mishra <[email protected]>
---
 .../providers/cncf/kubernetes/operators/pod.py     |   8 +-
 .../providers/cncf/kubernetes/triggers/pod.py      |  21 +++-
 .../unit/cncf/kubernetes/operators/test_pod.py     |   4 +-
 .../unit/cncf/kubernetes/triggers/test_pod.py      | 107 +++++++++++++++++++--
 4 files changed, 120 insertions(+), 20 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index cead8187997..c28d1c9b67f 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -925,19 +925,19 @@ class KubernetesPodOperator(BaseOperator):
             logging_interval=self.logging_interval,
             trigger_kwargs=self.trigger_kwargs,
         )
-        container_state = trigger.define_container_state(self.pod) if self.pod 
else None
+        pod_container_state = trigger.define_pod_container_state(self.pod) if 
self.pod else None
         if context and (
-            container_state == ContainerState.TERMINATED or container_state == 
ContainerState.FAILED
+            pod_container_state == ContainerState.TERMINATED or 
pod_container_state == ContainerState.FAILED
         ):
             self.log.info("Skipping deferral as pod is already in a terminal 
state")
             self.trigger_reentry(
                 context=context,
                 event={
-                    "status": "failed" if container_state == 
ContainerState.FAILED else "success",
+                    "status": "failed" if pod_container_state == 
ContainerState.FAILED else "success",
                     "namespace": trigger.pod_namespace,
                     "name": trigger.pod_name,
                     "message": "Container failed"
-                    if container_state == ContainerState.FAILED
+                    if pod_container_state == ContainerState.FAILED
                     else "Container succeeded",
                     "last_log_time": last_log_time,
                     **(self.trigger_kwargs or {}),
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index fc9ec300a0c..e801a2f0566 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -292,7 +292,7 @@ class KubernetesPodTrigger(BaseTrigger):
             with contextlib.suppress(asyncio.CancelledError):
                 await events_task
 
-        return self.define_container_state(await self._get_pod())
+        return self.define_pod_container_state(await self._get_pod())
 
     async def _wait_for_container_completion(self) -> TriggerEvent:
         """
@@ -307,8 +307,8 @@ class KubernetesPodTrigger(BaseTrigger):
             time_get_more_logs = time_begin + 
datetime.timedelta(seconds=self.logging_interval)
         while True:
             pod = await self._get_pod()
-            container_state = self.define_container_state(pod)
-            if container_state == ContainerState.TERMINATED:
+            pod_container_state = self.define_pod_container_state(pod)
+            if pod_container_state == ContainerState.TERMINATED:
                 return TriggerEvent(
                     {
                         "status": "success",
@@ -318,7 +318,7 @@ class KubernetesPodTrigger(BaseTrigger):
                         **self.trigger_kwargs,
                     }
                 )
-            if container_state == ContainerState.FAILED:
+            if pod_container_state == ContainerState.FAILED:
                 return TriggerEvent(
                     {
                         "status": "failed",
@@ -508,6 +508,19 @@ class KubernetesPodTrigger(BaseTrigger):
                 return ContainerState.TERMINATED if state_obj.exit_code == 0 
else ContainerState.FAILED
         return ContainerState.UNDEFINED
 
+    def define_pod_container_state(self, pod: V1Pod) -> ContainerState:
+        """Infer workload state from terminal pod phase first, then from the 
base container state."""
+        if pod.status is None:
+            return ContainerState.UNDEFINED
+
+        if pod.status.phase == PodPhase.SUCCEEDED:
+            return ContainerState.TERMINATED
+
+        if pod.status.phase == PodPhase.FAILED:
+            return ContainerState.FAILED
+
+        return self.define_container_state(pod)
+
     @staticmethod
     def should_wait(pod_phase: PodPhase, container_state: ContainerState) -> 
bool:
         return (
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index 5f8da532fe1..ad3013a0a0e 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -2529,10 +2529,10 @@ class TestKubernetesPodOperatorAsync:
     @patch(KUB_OP_PATH.format("build_pod_request_obj"))
     @patch(KUB_OP_PATH.format("get_or_create_pod"))
     
@patch("airflow.providers.cncf.kubernetes.operators.pod.BaseHook.get_connection")
-    @patch(f"{TRIGGER_CLASS}.define_container_state")
+    @patch(f"{TRIGGER_CLASS}.define_pod_container_state")
     def test_async_create_pod_should_execute_successfully(
         self,
-        mocked_container_state,
+        mocked_pod_container_state,
         mocked_get_connection,
         mocked_pod,
         mocked_pod_obj,
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
index bb0b00662ef..cc35bd4db47 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
@@ -200,7 +200,7 @@ class TestKubernetesPodTrigger:
 
     @pytest.mark.asyncio
     @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
-    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
     @mock.patch(f"{TRIGGER_PATH}.hook")
     async def test_run_loop_return_waiting_event(
         self, mock_hook, mock_method, mock_wait_pod, trigger, caplog
@@ -219,7 +219,7 @@ class TestKubernetesPodTrigger:
 
     @pytest.mark.asyncio
     @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
-    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
     @mock.patch(f"{TRIGGER_PATH}.hook")
     async def test_run_loop_return_running_event(
         self, mock_hook, mock_method, mock_wait_pod, trigger, caplog
@@ -238,7 +238,7 @@ class TestKubernetesPodTrigger:
 
     @pytest.mark.asyncio
     @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
-    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
     @mock.patch(f"{TRIGGER_PATH}.hook")
     async def test_run_loop_return_failed_event(self, mock_hook, mock_method, 
mock_wait_pod, trigger):
         mock_hook.get_pod.return_value = self._mock_pod_result(
@@ -287,7 +287,7 @@ class TestKubernetesPodTrigger:
         assert actual_stack_trace.startswith("Traceback (most recent call 
last):")
 
     @pytest.mark.asyncio
-    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
     @mock.patch(f"{TRIGGER_PATH}.hook")
     async def test_logging_in_trigger_when_fail_should_execute_successfully(
         self, mock_hook, mock_method, trigger, caplog
@@ -321,7 +321,7 @@ class TestKubernetesPodTrigger:
         ],
     )
     @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.datetime")
-    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
     @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
     @mock.patch(
         
"airflow.providers.cncf.kubernetes.triggers.pod.AsyncPodManager.fetch_container_logs_before_current_sec"
@@ -332,7 +332,7 @@ class TestKubernetesPodTrigger:
         mock_get_pod,
         mock_fetch_container_logs_before_current_sec,
         mock_wait_pod,
-        define_container_state,
+        define_pod_container_state,
         mock_datetime,
         logging_interval,
         exp_event,
@@ -354,7 +354,7 @@ class TestKubernetesPodTrigger:
             return DateTime(2022, 1, 1)
 
         mock_fetch_container_logs_before_current_sec.side_effect = 
async_datetime_return
-        define_container_state.side_effect = ["running", "running", 
"terminated"]
+        define_pod_container_state.side_effect = ["running", "running", 
"terminated"]
         trigger = KubernetesPodTrigger(
             pod_name=POD_NAME,
             pod_namespace=NAMESPACE,
@@ -406,8 +406,95 @@ class TestKubernetesPodTrigger:
 
         assert expected_state == trigger.define_container_state(pod)
 
+    def 
test_define_pod_container_state_returns_failed_when_init_container_fails(self, 
trigger):
+        pod = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(name="base", namespace="default"),
+            status=k8s.V1PodStatus(
+                phase=PodPhase.FAILED,
+                init_container_statuses=[
+                    k8s.V1ContainerStatus(
+                        name="init",
+                        image="alpine",
+                        image_id="1",
+                        ready=False,
+                        restart_count=0,
+                        
state=k8s.V1ContainerState(terminated=k8s.V1ContainerStateTerminated(exit_code=1)),
+                    )
+                ],
+                container_statuses=[
+                    k8s.V1ContainerStatus(
+                        name="base",
+                        image="alpine",
+                        image_id="1",
+                        ready=False,
+                        restart_count=0,
+                        state=k8s.V1ContainerState(
+                            
waiting=k8s.V1ContainerStateWaiting(reason="PodInitializing")
+                        ),
+                    )
+                ],
+            ),
+        )
+
+        assert trigger.define_pod_container_state(pod) == ContainerState.FAILED
+
+    def 
test_define_pod_container_state_returns_terminated_when_pod_succeeds(self, 
trigger):
+        pod = k8s.V1Pod(status=k8s.V1PodStatus(phase=PodPhase.SUCCEEDED))
+
+        assert trigger.define_pod_container_state(pod) == 
ContainerState.TERMINATED
+
+    def 
test_define_pod_container_state_falls_back_to_base_container_state_for_non_terminal_pod(
+        self, trigger
+    ):
+        pod = k8s.V1Pod(status=k8s.V1PodStatus(phase=PodPhase.RUNNING))
+
+        with mock.patch.object(
+            trigger, "define_container_state", 
return_value=ContainerState.RUNNING
+        ) as mock_define_container_state:
+            assert trigger.define_pod_container_state(pod) == 
ContainerState.RUNNING
+
+        mock_define_container_state.assert_called_once_with(pod)
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_PATH}.hook")
+    async def 
test_run_loop_returns_failed_when_pod_failed_and_base_container_waits(
+        self, mock_hook, mock_wait_pod, trigger
+    ):
+        pod = k8s.V1Pod(
+            status=k8s.V1PodStatus(
+                phase=PodPhase.FAILED,
+                container_statuses=[
+                    k8s.V1ContainerStatus(
+                        name="base",
+                        image="alpine",
+                        image_id="1",
+                        ready=False,
+                        restart_count=0,
+                        state=k8s.V1ContainerState(
+                            
waiting=k8s.V1ContainerStateWaiting(reason="PodInitializing")
+                        ),
+                    )
+                ],
+            )
+        )
+        mock_wait_pod.return_value = ContainerState.WAITING
+        mock_hook.get_pod.return_value = self._mock_pod_result(pod)
+
+        actual_event = await trigger.run().asend(None)
+
+        assert actual_event == TriggerEvent(
+            {
+                "status": "failed",
+                "namespace": "default",
+                "name": "test-pod-name",
+                "message": "Container state failed",
+                "last_log_time": None,
+            }
+        )
+
     @pytest.mark.asyncio
-    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
     @mock.patch(f"{TRIGGER_PATH}.hook")
     async def test_run_loop_read_events_during_start(self, mock_hook, 
mock_method, trigger):
         event1 = mock.Mock()
@@ -455,7 +542,7 @@ class TestKubernetesPodTrigger:
 
     @pytest.mark.asyncio
     @pytest.mark.parametrize("container_state", [ContainerState.WAITING, 
ContainerState.UNDEFINED])
-    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
     @mock.patch(f"{TRIGGER_PATH}.hook")
     async def test_run_loop_return_timeout_event(
         self, mock_hook, mock_method, trigger, container_state, 
mock_time_fixture
@@ -483,7 +570,7 @@ class TestKubernetesPodTrigger:
         )
 
     @pytest.mark.asyncio
-    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
     @mock.patch(f"{TRIGGER_PATH}.hook")
     async def test_run_loop_return_success_for_completed_pod_after_timeout(
         self, mock_hook, mock_method, trigger, mock_time_fixture

Reply via email to