This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d3b4a91135 fix: Avoid retrying after KubernetesPodOperator has been 
marked as failed (#36749)
d3b4a91135 is described below

commit d3b4a9113502a0f50669408e89f70947e4f3e87a
Author: Wei Lee <[email protected]>
AuthorDate: Sat Jan 20 16:43:59 2024 +0800

    fix: Avoid retrying after KubernetesPodOperator has been marked as failed 
(#36749)
    
    * fix(providers/cncf): avoid retrying after KubernetesPodOperator has been 
marked as failed
    
    #36471
    
    * feat(providers/cncf): add pod name to on_kill log
    
    * refactor(providers/cncf): refine comments and log message as suggested
    
    * docs(providers/cncf): reword comment
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    
    * fix(providers/cncf): move the _killed flag to cleanup method
    
    if the task has been killed, do not cleanup again
    
    ---------
    
    Co-authored-by: Daniel Standish 
<[email protected]>
    Co-authored-by: Jed Cunningham 
<[email protected]>
---
 airflow/providers/cncf/kubernetes/operators/pod.py | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 8a701b675a..70f8bc2252 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -29,6 +29,7 @@ from contextlib import AbstractContextManager
 from functools import cached_property
 from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence
 
+import kubernetes
 from kubernetes.client import CoreV1Api, V1Pod, models as k8s
 from kubernetes.stream import stream
 from urllib3.exceptions import HTTPError
@@ -380,6 +381,7 @@ class KubernetesPodOperator(BaseOperator):
 
         self._config_dict: dict | None = None  # TODO: remove it when removing 
convert_config_file_to_dict
         self._progress_callback = progress_callback
+        self._killed: bool = False
 
     @cached_property
     def _incluster_namespace(self):
@@ -577,6 +579,7 @@ class KubernetesPodOperator(BaseOperator):
                 pod=self.pod or self.pod_request_obj,
                 remote_pod=self.remote_pod,
             )
+
         if self.do_xcom_push:
             return result
 
@@ -676,6 +679,11 @@ class KubernetesPodOperator(BaseOperator):
         )
 
     def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
+        # If a task got marked as failed, "on_kill" method would be called and 
the pod will be cleaned up
+        # there. Cleaning it up again will raise an exception (which might 
cause retry).
+        if self._killed:
+            return
+
         istio_enabled = self.is_istio_enabled(remote_pod)
         pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") 
else None
 
@@ -818,6 +826,7 @@ class KubernetesPodOperator(BaseOperator):
             )
 
     def on_kill(self) -> None:
+        self._killed = True
         if self.pod:
             pod = self.pod
             kwargs = {
@@ -826,7 +835,11 @@ class KubernetesPodOperator(BaseOperator):
             }
             if self.termination_grace_period is not None:
                 
kwargs.update(grace_period_seconds=self.termination_grace_period)
-            self.client.delete_namespaced_pod(**kwargs)
+
+            try:
+                self.client.delete_namespaced_pod(**kwargs)
+            except kubernetes.client.exceptions.ApiException:
+                self.log.exception("Unable to delete pod %s", 
self.pod.metadata.name)
 
     def build_pod_request_obj(self, context: Context | None = None) -> 
k8s.V1Pod:
         """

Reply via email to