This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c6ba13adf2 Skip pod cleanup in case of pod creation failed (#37671)
c6ba13adf2 is described below
commit c6ba13adf278125177f561a23c601358294fa766
Author: Gopal Dirisala <[email protected]>
AuthorDate: Mon Feb 26 12:02:12 2024 +0530
Skip pod cleanup in case of pod creation failed (#37671)
---
airflow/providers/cncf/kubernetes/operators/pod.py | 6 ++++--
kubernetes_tests/test_kubernetes_pod_operator.py | 17 ++++++++++++++++-
tests/providers/cncf/kubernetes/operators/test_pod.py | 9 +++++++--
3 files changed, 27 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 93fc2b59e7..4955db7633 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -793,9 +793,11 @@ class KubernetesPodOperator(BaseOperator):
self.callbacks.on_pod_cleanup(pod=pod, client=self.client,
mode=ExecutionMode.SYNC)
def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
- # If a task got marked as failed, "on_kill" method would be called and
the pod will be cleaned up
+ # Skip cleaning the pod in the following scenarios.
+ # 1. If a task got marked as failed, "on_kill" method would be called
and the pod will be cleaned up
# there. Cleaning it up again will raise an exception (which might
cause retry).
- if self._killed:
+ # 2. remote pod is null (ex: pod creation failed)
+ if self._killed or not remote_pod:
return
istio_enabled = self.is_istio_enabled(remote_pod)
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 147c354dfc..21eba43663 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -206,6 +206,21 @@ class TestKubernetesPodOperatorSystem:
assert self.expected_pod["spec"] == actual_pod["spec"]
assert self.expected_pod["metadata"]["labels"] ==
actual_pod["metadata"]["labels"]
+ def test_skip_cleanup(self, mock_get_connection):
+ k = KubernetesPodOperator(
+ namespace="unknown",
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo 10"],
+ labels=self.labels,
+ task_id=str(uuid4()),
+ in_cluster=False,
+ do_xcom_push=False,
+ )
+ context = create_context(k)
+ with pytest.raises(ApiException):
+ k.execute(context)
+
def test_delete_operator_pod(self, mock_get_connection):
k = KubernetesPodOperator(
namespace="default",
@@ -1158,7 +1173,7 @@ class TestKubernetesPodOperatorSystem:
# `create_pod` should be called because though there's still a pod to
be found,
# it will be `already_checked`
with mock.patch(f"{POD_MANAGER_CLASS}.create_pod") as create_mock:
- with pytest.raises(AirflowException):
+ with pytest.raises(Exception):
k.execute(context)
create_mock.assert_called_once()
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index faa21eb7d7..a383895b2a 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -1229,16 +1229,21 @@ class TestKubernetesPodOperator:
_, kwargs = k.client.list_namespaced_pod.call_args
assert "already_checked!=True" in kwargs["label_selector"]
+ @patch(KUB_OP_PATH.format("find_pod"))
@patch(f"{POD_MANAGER_CLASS}.delete_pod")
@patch(f"{KPO_MODULE}.KubernetesPodOperator.patch_already_checked")
- def test_mark_checked_unexpected_exception(self,
mock_patch_already_checked, mock_delete_pod):
+ def test_mark_checked_unexpected_exception(
+ self, mock_patch_already_checked, mock_delete_pod, find_pod_mock
+ ):
"""If we aren't deleting pods and have an exception, mark it so we
don't reattach to it"""
k = KubernetesPodOperator(
task_id="task",
on_finish_action="keep_pod",
)
+ found_pods = [MagicMock(), MagicMock(), MagicMock()]
+ find_pod_mock.side_effect = [None] + found_pods
self.await_pod_mock.side_effect = AirflowException("oops")
- context = create_context(k)
+ context = create_context(k, persist_to_db=True)
with pytest.raises(AirflowException):
k.execute(context=context)
mock_patch_already_checked.assert_called_once()