This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2fa9474cc061152d7705a93ede5fe4bcfa49f1e1
Author: Jed Cunningham <[email protected]>
AuthorDate: Mon Jan 9 18:07:56 2023 -0600

    Only patch single label when adopting pod (#28776)
    
    When KubernetesExecutor adopts pods, it was patching the pod with the
    pod it retrieved from the k8s api, while just updating a single label.
    Normally this works just fine, but there are cases where the pod you
    pull from the k8s api can't be used as-is when patching - it results
    in a 422 `Forbidden: pod updates may not change fields other than ...`.
    
    Instead we now just pass the single label we need to update to patch,
    allowing us to avoid accidentally "updating" other fields.
    
    Closes #24015
    
    (cherry picked from commit 9922953bcd9e11a1412a3528aef938444d62f7fe)
---
 airflow/executors/kubernetes_executor.py    | 17 +++---
 tests/executors/test_kubernetes_executor.py | 80 +++++++++++++++++++++++++----
 2 files changed, 77 insertions(+), 20 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 65e463a948..28f720f35e 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -636,7 +636,6 @@ class KubernetesExecutor(BaseExecutor):
                     )
                     self.fail(task[0], e)
                 except ApiException as e:
-
                     # These codes indicate something is wrong with pod 
definition; otherwise we assume pod
                     # definition is ok, and that retrying may work
                     if e.status in (400, 422):
@@ -748,27 +747,28 @@ class KubernetesExecutor(BaseExecutor):
             assert self.scheduler_job_id
 
         self.log.info("attempting to adopt pod %s", pod.metadata.name)
-        pod.metadata.labels["airflow-worker"] = 
pod_generator.make_safe_label_value(self.scheduler_job_id)
         pod_id = annotations_to_key(pod.metadata.annotations)
         if pod_id not in pod_ids:
             self.log.error("attempting to adopt taskinstance which was not 
specified by database: %s", pod_id)
             return
 
+        new_worker_id_label = 
pod_generator.make_safe_label_value(self.scheduler_job_id)
         try:
             kube_client.patch_namespaced_pod(
                 name=pod.metadata.name,
                 namespace=pod.metadata.namespace,
-                body=PodGenerator.serialize_pod(pod),
+                body={"metadata": {"labels": {"airflow-worker": 
new_worker_id_label}}},
             )
-            pod_ids.pop(pod_id)
-            self.running.add(pod_id)
         except ApiException as e:
             self.log.info("Failed to adopt pod %s. Reason: %s", 
pod.metadata.name, e)
+            return
+
+        del pod_ids[pod_id]
+        self.running.add(pod_id)
 
     def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
         """
-
-        Patch completed pod so that the KubernetesJobWatcher can delete it.
+        Patch completed pods so that the KubernetesJobWatcher can delete them.
 
         :param kube_client: kubernetes client for speaking to kube API
         """
@@ -783,12 +783,11 @@ class KubernetesExecutor(BaseExecutor):
         pod_list = 
kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, 
**kwargs)
         for pod in pod_list.items:
             self.log.info("Attempting to adopt pod %s", pod.metadata.name)
-            pod.metadata.labels["airflow-worker"] = new_worker_id_label
             try:
                 kube_client.patch_namespaced_pod(
                     name=pod.metadata.name,
                     namespace=pod.metadata.namespace,
-                    body=PodGenerator.serialize_pod(pod),
+                    body={"metadata": {"labels": {"airflow-worker": 
new_worker_id_label}}},
                 )
             except ApiException as e:
                 self.log.info("Failed to adopt pod %s. Reason: %s", 
pod.metadata.name, e)
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index 367f1cb2c4..97619225e6 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -654,20 +654,78 @@ class TestKubernetesExecutor:
         pod_ids = {ti_key: {}}
 
         executor.adopt_launched_task(mock_kube_client, pod=pod, 
pod_ids=pod_ids)
-        assert mock_kube_client.patch_namespaced_pod.call_args[1] == {
-            "body": {
-                "metadata": {
-                    "labels": {"airflow-worker": "modified"},
-                    "annotations": annotations,
-                    "name": "foo",
-                }
-            },
-            "name": "foo",
-            "namespace": None,
-        }
+        mock_kube_client.patch_namespaced_pod.assert_called_once_with(
+            body={"metadata": {"labels": {"airflow-worker": "modified"}}},
+            name="foo",
+            namespace=None,
+        )
         assert pod_ids == {}
         assert executor.running == {ti_key}
 
+    @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+    def test_adopt_launched_task_api_exception(self, mock_kube_client):
+        """We shouldn't think we are running the task if aren't able to patch 
the pod"""
+        executor = self.kubernetes_executor
+        executor.scheduler_job_id = "modified"
+        annotations = {
+            "dag_id": "dag",
+            "run_id": "run_id",
+            "task_id": "task",
+            "try_number": "1",
+        }
+        ti_key = annotations_to_key(annotations)
+        pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo", 
annotations=annotations))
+        pod_ids = {ti_key: {}}
+
+        mock_kube_client.patch_namespaced_pod.side_effect = 
ApiException(status=400)
+        executor.adopt_launched_task(mock_kube_client, pod=pod, 
pod_ids=pod_ids)
+        mock_kube_client.patch_namespaced_pod.assert_called_once_with(
+            body={"metadata": {"labels": {"airflow-worker": "modified"}}},
+            name="foo",
+            namespace=None,
+        )
+        assert pod_ids == {ti_key: {}}
+        assert executor.running == set()
+
+    @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+    def test_adopt_completed_pods(self, mock_kube_client):
+        """We should adopt all completed pods from other schedulers"""
+        executor = self.kubernetes_executor
+        executor.scheduler_job_id = "modified"
+        executor.kube_client = mock_kube_client
+        executor.kube_config.kube_namespace = "somens"
+        pod_names = ["one", "two"]
+        mock_kube_client.list_namespaced_pod.return_value.items = [
+            k8s.V1Pod(
+                metadata=k8s.V1ObjectMeta(
+                    name=pod_name,
+                    labels={"airflow-worker": pod_name},
+                    annotations={"some_annotation": "hello"},
+                    namespace="somens",
+                )
+            )
+            for pod_name in pod_names
+        ]
+
+        executor._adopt_completed_pods(mock_kube_client)
+        mock_kube_client.list_namespaced_pod.assert_called_once_with(
+            namespace="somens",
+            field_selector="status.phase=Succeeded",
+            label_selector="kubernetes_executor=True,airflow-worker!=modified",
+        )
+        assert len(pod_names) == 
mock_kube_client.patch_namespaced_pod.call_count
+        mock_kube_client.patch_namespaced_pod.assert_has_calls(
+            [
+                mock.call(
+                    body={"metadata": {"labels": {"airflow-worker": 
"modified"}}},
+                    name=pod_name,
+                    namespace="somens",
+                )
+                for pod_name in pod_names
+            ],
+            any_order=True,
+        )
+
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
     def test_not_adopt_unassigned_task(self, mock_kube_client):
         """

Reply via email to