This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b8d06e8  Fix KubernetesPodOperator reattach when not deleting pods 
(#18070)
b8d06e8 is described below

commit b8d06e812ac56af6b0d17830c63b705ace9d4959
Author: Jed Cunningham <[email protected]>
AuthorDate: Wed Sep 8 04:31:58 2021 -0600

    Fix KubernetesPodOperator reattach when not deleting pods (#18070)
    
    This change:
    
    - ignores any pods already marked as complete instead of trying to
      reattach to them.
    - properly marks all 'finished' unsuccessful pods that won't
      be deleted.
    
    Combined, these allow `is_delete_operator_pod=False` and
    `reattach_on_restart=True` to function together properly during retries.
---
 .../cncf/kubernetes/operators/kubernetes_pod.py    | 12 ++-
 .../kubernetes/operators/test_kubernetes_pod.py    | 89 ++++++++++++++++++++++
 2 files changed, 98 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py 
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 6cf8474..747f8b0 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -409,8 +409,10 @@ class KubernetesPodOperator(BaseOperator):
 
     @staticmethod
     def _get_pod_identifying_label_string(labels) -> str:
-        filtered_labels = {label_id: label for label_id, label in 
labels.items() if label_id != 'try_number'}
-        return ','.join(label_id + '=' + label for label_id, label in 
sorted(filtered_labels.items()))
+        label_strings = [
+            f'{label_id}={label}' for label_id, label in 
sorted(labels.items()) if label_id != 'try_number'
+        ]
+        return ','.join(label_strings) + ',already_checked!=True'
 
     @staticmethod
     def _try_numbers_match(context, pod) -> bool:
@@ -516,6 +518,7 @@ class KubernetesPodOperator(BaseOperator):
         )
 
         self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
+        final_state = None
         try:
             launcher.start_pod(self.pod, 
startup_timeout=self.startup_timeout_seconds)
             final_state, remote_pod, result = 
launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs)
@@ -528,6 +531,8 @@ class KubernetesPodOperator(BaseOperator):
             if self.is_delete_operator_pod:
                 self.log.debug("Deleting pod for task %s", self.task_id)
                 launcher.delete_pod(self.pod)
+            elif final_state != State.SUCCESS:
+                self.patch_already_checked(self.pod)
         return final_state, remote_pod, result
 
     def patch_already_checked(self, pod: k8s.V1Pod):
@@ -553,7 +558,8 @@ class KubernetesPodOperator(BaseOperator):
             if self.log_events_on_failure:
                 for event in launcher.read_pod_events(pod).items:
                     self.log.error("Pod Event: %s - %s", event.reason, 
event.message)
-            self.patch_already_checked(pod)
+            if not self.is_delete_operator_pod:
+                self.patch_already_checked(pod)
             raise AirflowException(f'Pod returned a failure: {final_state}')
         return final_state, remote_pod, result
 
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py 
b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index 47a20f8..7f4e53b 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -658,3 +658,92 @@ class TestKubernetesPodOperator(unittest.TestCase):
         pod_namespace = ti.xcom_pull(task_ids=k.task_id, key='pod_namespace')
         assert pod_name and pod_name == pod.metadata.name
         assert pod_namespace and pod_namespace == pod.metadata.namespace
+
+    def test_previous_pods_ignored_for_reattached(self):
+        """
+        When looking for pods to possibly reattach to,
+        ignore pods from previous tries that were properly finished
+        """
+        k = KubernetesPodOperator(
+            namespace="default",
+            image="ubuntu:16.04",
+            name="test",
+            task_id="task",
+        )
+        self.run_pod(k)
+        self.client_mock.return_value.list_namespaced_pod.assert_called_once()
+        _, kwargs = self.client_mock.return_value.list_namespaced_pod.call_args
+        assert 'already_checked!=True' in kwargs['label_selector']
+
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod")
+    @mock.patch(
+        "airflow.providers.cncf.kubernetes.operators.kubernetes_pod"
+        ".KubernetesPodOperator.patch_already_checked"
+    )
+    def test_mark_created_pod_if_not_deleted(self, mock_patch_already_checked, 
mock_delete_pod):
+        """If we aren't deleting pods and have a failure, mark it so we don't 
reattach to it"""
+        k = KubernetesPodOperator(
+            namespace="default",
+            image="ubuntu:16.04",
+            name="test",
+            task_id="task",
+            is_delete_operator_pod=False,
+        )
+        self.monitor_mock.return_value = (State.FAILED, None, None)
+        context = self.create_context(k)
+        with pytest.raises(AirflowException):
+            k.execute(context=context)
+        mock_patch_already_checked.assert_called_once()
+        mock_delete_pod.assert_not_called()
+
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod")
+    @mock.patch(
+        "airflow.providers.cncf.kubernetes.operators.kubernetes_pod"
+        ".KubernetesPodOperator.patch_already_checked"
+    )
+    def test_mark_created_pod_if_not_deleted_during_exception(
+        self, mock_patch_already_checked, mock_delete_pod
+    ):
+        """If we aren't deleting pods and have an exception, mark it so we 
don't reattach to it"""
+        k = KubernetesPodOperator(
+            namespace="default",
+            image="ubuntu:16.04",
+            name="test",
+            task_id="task",
+            is_delete_operator_pod=False,
+        )
+        self.monitor_mock.side_effect = AirflowException("oops")
+        context = self.create_context(k)
+        with pytest.raises(AirflowException):
+            k.execute(context=context)
+        mock_patch_already_checked.assert_called_once()
+        mock_delete_pod.assert_not_called()
+
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod")
+    @mock.patch(
+        "airflow.providers.cncf.kubernetes.operators.kubernetes_pod"
+        ".KubernetesPodOperator.patch_already_checked"
+    )
+    def test_mark_reattached_pod_if_not_deleted(self, 
mock_patch_already_checked, mock_delete_pod):
+        """If we aren't deleting pods and have a failure, mark it so we don't 
reattach to it"""
+        k = KubernetesPodOperator(
+            namespace="default",
+            image="ubuntu:16.04",
+            name="test",
+            task_id="task",
+            is_delete_operator_pod=False,
+        )
+        # Run it first to easily get the pod
+        pod = self.run_pod(k)
+
+        # Now try and "reattach"
+        mock_patch_already_checked.reset_mock()
+        mock_delete_pod.reset_mock()
+        self.client_mock.return_value.list_namespaced_pod.return_value.items = 
[pod]
+        self.monitor_mock.return_value = (State.FAILED, None, None)
+
+        context = self.create_context(k)
+        with pytest.raises(AirflowException):
+            k.execute(context=context)
+        mock_patch_already_checked.assert_called_once()
+        mock_delete_pod.assert_not_called()

Reply via email to