This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 54fa258b573 improve deferrable KPO handling of deleted pods in between 
polls (#56976)
54fa258b573 is described below

commit 54fa258b5737f138ff9abea4f341c33ae2c1a707
Author: Cedrik Neumann <[email protected]>
AuthorDate: Tue Oct 28 11:39:00 2025 -0600

    improve deferrable KPO handling of deleted pods in between polls (#56976)
    
    * refactor: log exception in KubernetesPodTrigger
    
    This makes the error's stacktrace easier to review in the logs and does
    not depend on `trigger_reentry` to do so.
    
    * refactor: log poll_interval on trigger start
    
    This makes it easier to know which value for poll_interval is actually
    being used.
    
    * fix: handle missing pod when trying to cleanup after trigger reentry
    
    It can happen that the pod doesn't exist anymore when running 
`trigger_reentry`.
    
    In that case `self.hook.get_pod` will cause a 404 API exception, which
    then will end up calling `self._clean` which assumed `self.pod` not to be 
None.
    
    * fix: don't use structured logging when logging poll interval
    
    This doesn't work in Airflow 2.
---
 .../src/airflow/providers/cncf/kubernetes/operators/pod.py   |  4 ++++
 .../src/airflow/providers/cncf/kubernetes/triggers/pod.py    | 12 +++++++++++-
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index da2e1138672..1df9a1be8ff 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -968,6 +968,10 @@ class KubernetesPodOperator(BaseOperator):
     def _clean(self, event: dict[str, Any], result: dict | None, context: 
Context) -> None:
         if event["status"] == "running":
             return
+
+        if self.pod is None:
+            return
+
         istio_enabled = self.is_istio_enabled(self.pod)
         # Skip await_pod_completion when the event is 'timeout' due to the pod 
can hang
         # on the ErrImagePull or ContainerCreating step and it will never 
complete
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index 2646854c5d0..9008e0573ec 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -143,7 +143,12 @@ class KubernetesPodTrigger(BaseTrigger):
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
         """Get current pod status and yield a TriggerEvent."""
-        self.log.info("Checking pod %r in namespace %r.", self.pod_name, 
self.pod_namespace)
+        self.log.info(
+            "Checking pod %r in namespace %r with poll interval %r.",
+            self.pod_name,
+            self.pod_namespace,
+            self.poll_interval,
+        )
         try:
             state = await self._wait_for_pod_start()
             if state == ContainerState.TERMINATED:
@@ -183,6 +188,11 @@ class KubernetesPodTrigger(BaseTrigger):
             )
             return
         except Exception as e:
+            self.log.exception(
+                "Unexpected error while waiting for pod %s in namespace %s",
+                self.pod_name,
+                self.pod_namespace,
+            )
             yield TriggerEvent(
                 {
                     "name": self.pod_name,

Reply via email to