This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d3b4a91135 fix: Avoid retrying after KubernetesPodOperator has been
marked as failed (#36749)
d3b4a91135 is described below
commit d3b4a9113502a0f50669408e89f70947e4f3e87a
Author: Wei Lee <[email protected]>
AuthorDate: Sat Jan 20 16:43:59 2024 +0800
fix: Avoid retrying after KubernetesPodOperator has been marked as failed
(#36749)
* fix(providers/cncf): avoid retrying after KubernetesPodOperator has been
marked as failed
#36471
* feat(providers/cncf): add pod name to on_kill log
* refactor(providers/cncf): refine comments and log message as suggested
* docs(providers/cncf): reword comment
Co-authored-by: Jed Cunningham
<[email protected]>
* fix(providers/cncf): move the _killed flag to cleanup method
if the task has been killed, do not cleanup again
---------
Co-authored-by: Daniel Standish
<[email protected]>
Co-authored-by: Jed Cunningham
<[email protected]>
---
airflow/providers/cncf/kubernetes/operators/pod.py | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 8a701b675a..70f8bc2252 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -29,6 +29,7 @@ from contextlib import AbstractContextManager
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence
+import kubernetes
from kubernetes.client import CoreV1Api, V1Pod, models as k8s
from kubernetes.stream import stream
from urllib3.exceptions import HTTPError
@@ -380,6 +381,7 @@ class KubernetesPodOperator(BaseOperator):
self._config_dict: dict | None = None # TODO: remove it when removing
convert_config_file_to_dict
self._progress_callback = progress_callback
+ self._killed: bool = False
@cached_property
def _incluster_namespace(self):
@@ -577,6 +579,7 @@ class KubernetesPodOperator(BaseOperator):
pod=self.pod or self.pod_request_obj,
remote_pod=self.remote_pod,
)
+
if self.do_xcom_push:
return result
@@ -676,6 +679,11 @@ class KubernetesPodOperator(BaseOperator):
)
def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
+ # If a task got marked as failed, "on_kill" method would be called and
the pod will be cleaned up
+ # there. Cleaning it up again will raise an exception (which might
cause retry).
+ if self._killed:
+ return
+
istio_enabled = self.is_istio_enabled(remote_pod)
pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status")
else None
@@ -818,6 +826,7 @@ class KubernetesPodOperator(BaseOperator):
)
def on_kill(self) -> None:
+ self._killed = True
if self.pod:
pod = self.pod
kwargs = {
@@ -826,7 +835,11 @@ class KubernetesPodOperator(BaseOperator):
}
if self.termination_grace_period is not None:
kwargs.update(grace_period_seconds=self.termination_grace_period)
- self.client.delete_namespaced_pod(**kwargs)
+
+ try:
+ self.client.delete_namespaced_pod(**kwargs)
+ except kubernetes.client.exceptions.ApiException:
+ self.log.exception("Unable to delete pod %s",
self.pod.metadata.name)
def build_pod_request_obj(self, context: Context | None = None) ->
k8s.V1Pod:
"""