This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b014077fe3 Fix ApiException handling when adopting completed pods
(#41109)
b014077fe3 is described below
commit b014077fe31853b857a081f18a56552abdae3427
Author: Jed Cunningham <[email protected]>
AuthorDate: Wed Jul 31 06:16:21 2024 -0600
Fix ApiException handling when adopting completed pods (#41109)
When trying to complete adopted pods, if we encounter an ApiException,
we should assume we were unable to adopt the pod meaning we shouldn't
add that to our `running` set. If we do, `running` can fill up over time
with tasks the executor isn't actually watching.
---
.../kubernetes/executors/kubernetes_executor.py | 2 ++
.../executors/test_kubernetes_executor.py | 35 ++++++++++++++++++++++
2 files changed, 37 insertions(+)
diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 11065ede7f..612f64c8f3 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -694,6 +694,8 @@ class KubernetesExecutor(BaseExecutor):
)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s",
pod.metadata.name, e)
+ continue
+
ti_id = annotations_to_key(pod.metadata.annotations)
self.running.add(ti_id)
diff --git
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index 1ed9399b59..1ae36356b2 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -1127,6 +1127,41 @@ class TestKubernetesExecutor:
)
assert executor.running == expected_running_ti_keys
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+ def test_adopt_completed_pods_api_exception(self, mock_kube_client,
mock_kube_dynamic_client):
+ """We should gracefully handle exceptions when adopting completed pods
from other schedulers"""
+ executor = self.kubernetes_executor
+ executor.scheduler_job_id = "modified"
+ executor.kube_client = mock_kube_client
+ executor.kube_config.kube_namespace = "somens"
+ pod_names = ["one", "two"]
+
+ def get_annotations(pod_name):
+ return {
+ "dag_id": "dag",
+ "run_id": "run_id",
+ "task_id": pod_name,
+ "try_number": "1",
+ }
+
+ mock_kube_dynamic_client.return_value.get.return_value.items = [
+ k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(
+ name=pod_name,
+ labels={"airflow-worker": pod_name},
+ annotations=get_annotations(pod_name),
+ namespace="somens",
+ )
+ )
+ for pod_name in pod_names
+ ]
+
+ mock_kube_client.patch_namespaced_pod.side_effect =
ApiException(status=400)
+ executor._adopt_completed_pods(mock_kube_client)
+ assert len(pod_names) ==
mock_kube_client.patch_namespaced_pod.call_count
+ assert executor.running == set()
+
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_not_adopt_unassigned_task(self, mock_kube_client):
"""