This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b8d06e8 Fix KubernetesPodOperator reattach when not deleting pods
(#18070)
b8d06e8 is described below
commit b8d06e812ac56af6b0d17830c63b705ace9d4959
Author: Jed Cunningham <[email protected]>
AuthorDate: Wed Sep 8 04:31:58 2021 -0600
Fix KubernetesPodOperator reattach when not deleting pods (#18070)
This change:
- ignores any pods already marked as complete instead of trying to
reattach to them.
- properly marks all 'finished' unsuccessful pods that won't
be deleted.
Combined, these allow `is_delete_operator_pod=False` and
`reattach_on_restart=True` to function together properly during retries.
---
.../cncf/kubernetes/operators/kubernetes_pod.py | 12 ++-
.../kubernetes/operators/test_kubernetes_pod.py | 89 ++++++++++++++++++++++
2 files changed, 98 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 6cf8474..747f8b0 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -409,8 +409,10 @@ class KubernetesPodOperator(BaseOperator):
@staticmethod
def _get_pod_identifying_label_string(labels) -> str:
- filtered_labels = {label_id: label for label_id, label in
labels.items() if label_id != 'try_number'}
- return ','.join(label_id + '=' + label for label_id, label in
sorted(filtered_labels.items()))
+ label_strings = [
+ f'{label_id}={label}' for label_id, label in
sorted(labels.items()) if label_id != 'try_number'
+ ]
+ return ','.join(label_strings) + ',already_checked!=True'
@staticmethod
def _try_numbers_match(context, pod) -> bool:
@@ -516,6 +518,7 @@ class KubernetesPodOperator(BaseOperator):
)
self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
+ final_state = None
try:
launcher.start_pod(self.pod,
startup_timeout=self.startup_timeout_seconds)
final_state, remote_pod, result =
launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs)
@@ -528,6 +531,8 @@ class KubernetesPodOperator(BaseOperator):
if self.is_delete_operator_pod:
self.log.debug("Deleting pod for task %s", self.task_id)
launcher.delete_pod(self.pod)
+ elif final_state != State.SUCCESS:
+ self.patch_already_checked(self.pod)
return final_state, remote_pod, result
def patch_already_checked(self, pod: k8s.V1Pod):
@@ -553,7 +558,8 @@ class KubernetesPodOperator(BaseOperator):
if self.log_events_on_failure:
for event in launcher.read_pod_events(pod).items:
self.log.error("Pod Event: %s - %s", event.reason,
event.message)
- self.patch_already_checked(pod)
+ if not self.is_delete_operator_pod:
+ self.patch_already_checked(pod)
raise AirflowException(f'Pod returned a failure: {final_state}')
return final_state, remote_pod, result
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index 47a20f8..7f4e53b 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -658,3 +658,92 @@ class TestKubernetesPodOperator(unittest.TestCase):
pod_namespace = ti.xcom_pull(task_ids=k.task_id, key='pod_namespace')
assert pod_name and pod_name == pod.metadata.name
assert pod_namespace and pod_namespace == pod.metadata.namespace
+
+ def test_previous_pods_ignored_for_reattached(self):
+ """
+ When looking for pods to possibly reattach to,
+ ignore pods from previous tries that were properly finished
+ """
+ k = KubernetesPodOperator(
+ namespace="default",
+ image="ubuntu:16.04",
+ name="test",
+ task_id="task",
+ )
+ self.run_pod(k)
+ self.client_mock.return_value.list_namespaced_pod.assert_called_once()
+ _, kwargs = self.client_mock.return_value.list_namespaced_pod.call_args
+ assert 'already_checked!=True' in kwargs['label_selector']
+
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod")
+ @mock.patch(
+ "airflow.providers.cncf.kubernetes.operators.kubernetes_pod"
+ ".KubernetesPodOperator.patch_already_checked"
+ )
+ def test_mark_created_pod_if_not_deleted(self, mock_patch_already_checked,
mock_delete_pod):
+ """If we aren't deleting pods and have a failure, mark it so we don't
reattach to it"""
+ k = KubernetesPodOperator(
+ namespace="default",
+ image="ubuntu:16.04",
+ name="test",
+ task_id="task",
+ is_delete_operator_pod=False,
+ )
+ self.monitor_mock.return_value = (State.FAILED, None, None)
+ context = self.create_context(k)
+ with pytest.raises(AirflowException):
+ k.execute(context=context)
+ mock_patch_already_checked.assert_called_once()
+ mock_delete_pod.assert_not_called()
+
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod")
+ @mock.patch(
+ "airflow.providers.cncf.kubernetes.operators.kubernetes_pod"
+ ".KubernetesPodOperator.patch_already_checked"
+ )
+ def test_mark_created_pod_if_not_deleted_during_exception(
+ self, mock_patch_already_checked, mock_delete_pod
+ ):
+ """If we aren't deleting pods and have an exception, mark it so we
don't reattach to it"""
+ k = KubernetesPodOperator(
+ namespace="default",
+ image="ubuntu:16.04",
+ name="test",
+ task_id="task",
+ is_delete_operator_pod=False,
+ )
+ self.monitor_mock.side_effect = AirflowException("oops")
+ context = self.create_context(k)
+ with pytest.raises(AirflowException):
+ k.execute(context=context)
+ mock_patch_already_checked.assert_called_once()
+ mock_delete_pod.assert_not_called()
+
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod")
+ @mock.patch(
+ "airflow.providers.cncf.kubernetes.operators.kubernetes_pod"
+ ".KubernetesPodOperator.patch_already_checked"
+ )
+ def test_mark_reattached_pod_if_not_deleted(self,
mock_patch_already_checked, mock_delete_pod):
+ """If we aren't deleting pods and have a failure, mark it so we don't
reattach to it"""
+ k = KubernetesPodOperator(
+ namespace="default",
+ image="ubuntu:16.04",
+ name="test",
+ task_id="task",
+ is_delete_operator_pod=False,
+ )
+ # Run it first to easily get the pod
+ pod = self.run_pod(k)
+
+ # Now try and "reattach"
+ mock_patch_already_checked.reset_mock()
+ mock_delete_pod.reset_mock()
+ self.client_mock.return_value.list_namespaced_pod.return_value.items =
[pod]
+ self.monitor_mock.return_value = (State.FAILED, None, None)
+
+ context = self.create_context(k)
+ with pytest.raises(AirflowException):
+ k.execute(context=context)
+ mock_patch_already_checked.assert_called_once()
+ mock_delete_pod.assert_not_called()