This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b014077fe3 Fix ApiException handling when adopting completed pods 
(#41109)
b014077fe3 is described below

commit b014077fe31853b857a081f18a56552abdae3427
Author: Jed Cunningham <[email protected]>
AuthorDate: Wed Jul 31 06:16:21 2024 -0600

    Fix ApiException handling when adopting completed pods (#41109)
    
    When trying to complete adopted pods, if we encounter an ApiException,
    we should assume we were unable to adopt the pod meaning we shouldn't
    add that to our `running` set. If we do, `running` can fill up over time
    with tasks the executor isn't actually watching.
---
 .../kubernetes/executors/kubernetes_executor.py    |  2 ++
 .../executors/test_kubernetes_executor.py          | 35 ++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 11065ede7f..612f64c8f3 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -694,6 +694,8 @@ class KubernetesExecutor(BaseExecutor):
                 )
             except ApiException as e:
                 self.log.info("Failed to adopt pod %s. Reason: %s", 
pod.metadata.name, e)
+                continue
+
             ti_id = annotations_to_key(pod.metadata.annotations)
             self.running.add(ti_id)
 
diff --git 
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py 
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index 1ed9399b59..1ae36356b2 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -1127,6 +1127,41 @@ class TestKubernetesExecutor:
         )
         assert executor.running == expected_running_ti_keys
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+    
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+    def test_adopt_completed_pods_api_exception(self, mock_kube_client, 
mock_kube_dynamic_client):
+        """We should gracefully handle exceptions when adopting completed pods 
from other schedulers"""
+        executor = self.kubernetes_executor
+        executor.scheduler_job_id = "modified"
+        executor.kube_client = mock_kube_client
+        executor.kube_config.kube_namespace = "somens"
+        pod_names = ["one", "two"]
+
+        def get_annotations(pod_name):
+            return {
+                "dag_id": "dag",
+                "run_id": "run_id",
+                "task_id": pod_name,
+                "try_number": "1",
+            }
+
+        mock_kube_dynamic_client.return_value.get.return_value.items = [
+            k8s.V1Pod(
+                metadata=k8s.V1ObjectMeta(
+                    name=pod_name,
+                    labels={"airflow-worker": pod_name},
+                    annotations=get_annotations(pod_name),
+                    namespace="somens",
+                )
+            )
+            for pod_name in pod_names
+        ]
+
+        mock_kube_client.patch_namespaced_pod.side_effect = 
ApiException(status=400)
+        executor._adopt_completed_pods(mock_kube_client)
+        assert len(pod_names) == 
mock_kube_client.patch_namespaced_pod.call_count
+        assert executor.running == set()
+
     
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
     def test_not_adopt_unassigned_task(self, mock_kube_client):
         """

Reply via email to