This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 54fa258b573 improve deferrable KPO handling of deleted pods in between
polls (#56976)
54fa258b573 is described below
commit 54fa258b5737f138ff9abea4f341c33ae2c1a707
Author: Cedrik Neumann <[email protected]>
AuthorDate: Tue Oct 28 11:39:00 2025 -0600
improve deferrable KPO handling of deleted pods in between polls (#56976)
* refactor: log exception in KubernetesPodTrigger
This makes the error's stacktrace easier to review in the logs and does
not depend on `trigger_reentry` to do so.
* refactor: log poll_interval on trigger start
This makes it easier to know which value for poll_interval is actually
being used.
* fix: handle missing pod when trying to cleanup after trigger reentry
It can happen that the pod doesn't exist anymore when running
`trigger_reentry`.
In that case `self.hook.get_pod` will cause a 404 API exception, which
then will end up calling `self._clean` which assumed `self.pod` not to be
None.
* fix: don't use structured logging when logging poll interval
This doesn't work in Airflow 2.
---
.../src/airflow/providers/cncf/kubernetes/operators/pod.py | 4 ++++
.../src/airflow/providers/cncf/kubernetes/triggers/pod.py | 12 +++++++++++-
2 files changed, 15 insertions(+), 1 deletion(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index da2e1138672..1df9a1be8ff 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -968,6 +968,10 @@ class KubernetesPodOperator(BaseOperator):
def _clean(self, event: dict[str, Any], result: dict | None, context:
Context) -> None:
if event["status"] == "running":
return
+
+ if self.pod is None:
+ return
+
istio_enabled = self.is_istio_enabled(self.pod)
# Skip await_pod_completion when the event is 'timeout' due to the pod
can hang
# on the ErrImagePull or ContainerCreating step and it will never
complete
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index 2646854c5d0..9008e0573ec 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -143,7 +143,12 @@ class KubernetesPodTrigger(BaseTrigger):
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Get current pod status and yield a TriggerEvent."""
- self.log.info("Checking pod %r in namespace %r.", self.pod_name,
self.pod_namespace)
+ self.log.info(
+ "Checking pod %r in namespace %r with poll interval %r.",
+ self.pod_name,
+ self.pod_namespace,
+ self.poll_interval,
+ )
try:
state = await self._wait_for_pod_start()
if state == ContainerState.TERMINATED:
@@ -183,6 +188,11 @@ class KubernetesPodTrigger(BaseTrigger):
)
return
except Exception as e:
+ self.log.exception(
+ "Unexpected error while waiting for pod %s in namespace %s",
+ self.pod_name,
+ self.pod_namespace,
+ )
yield TriggerEvent(
{
"name": self.pod_name,