This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9922953bcd Only patch single label when adopting pod (#28776)
9922953bcd is described below
commit 9922953bcd9e11a1412a3528aef938444d62f7fe
Author: Jed Cunningham <[email protected]>
AuthorDate: Mon Jan 9 18:07:56 2023 -0600
Only patch single label when adopting pod (#28776)
When KubernetesExecutor adopts pods, it was patching the pod with the
pod it retrieved from the k8s api, while just updating a single label.
Normally this works just fine, but there are cases where the pod you
pull from the k8s api can't be used as-is when patching - it results
in a 422 `Forbidden: pod updates may not change fields other than ...`.
Instead we now just pass the single label we need to update to patch,
allowing us to avoid accidentally "updating" other fields.
Closes #24015
---
airflow/executors/kubernetes_executor.py | 15 +++---
tests/executors/test_kubernetes_executor.py | 80 +++++++++++++++++++++++++----
2 files changed, 77 insertions(+), 18 deletions(-)
diff --git a/airflow/executors/kubernetes_executor.py
b/airflow/executors/kubernetes_executor.py
index 12fbca31ff..c2d7406657 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -778,26 +778,28 @@ class KubernetesExecutor(BaseExecutor):
assert self.scheduler_job_id
self.log.info("attempting to adopt pod %s", pod.metadata.name)
- pod.metadata.labels["airflow-worker"] =
pod_generator.make_safe_label_value(self.scheduler_job_id)
pod_id = annotations_to_key(pod.metadata.annotations)
if pod_id not in pod_ids:
self.log.error("attempting to adopt taskinstance which was not
specified by database: %s", pod_id)
return
+ new_worker_id_label =
pod_generator.make_safe_label_value(self.scheduler_job_id)
try:
kube_client.patch_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
- body=PodGenerator.serialize_pod(pod),
+ body={"metadata": {"labels": {"airflow-worker":
new_worker_id_label}}},
)
- pod_ids.pop(pod_id)
- self.running.add(pod_id)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s",
pod.metadata.name, e)
+ return
+
+ del pod_ids[pod_id]
+ self.running.add(pod_id)
def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
"""
- Patch completed pod so that the KubernetesJobWatcher can delete it.
+ Patch completed pods so that the KubernetesJobWatcher can delete them.
:param kube_client: kubernetes client for speaking to kube API
"""
@@ -812,12 +814,11 @@ class KubernetesExecutor(BaseExecutor):
pod_list = self._list_pods(query_kwargs)
for pod in pod_list:
self.log.info("Attempting to adopt pod %s", pod.metadata.name)
- pod.metadata.labels["airflow-worker"] = new_worker_id_label
try:
kube_client.patch_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
- body=PodGenerator.serialize_pod(pod),
+ body={"metadata": {"labels": {"airflow-worker":
new_worker_id_label}}},
)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s",
pod.metadata.name, e)
diff --git a/tests/executors/test_kubernetes_executor.py
b/tests/executors/test_kubernetes_executor.py
index aef530911b..17611b9434 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -718,20 +718,78 @@ class TestKubernetesExecutor:
pod_ids = {ti_key: {}}
executor.adopt_launched_task(mock_kube_client, pod=pod,
pod_ids=pod_ids)
- assert mock_kube_client.patch_namespaced_pod.call_args[1] == {
- "body": {
- "metadata": {
- "labels": {"airflow-worker": "modified"},
- "annotations": annotations,
- "name": "foo",
- }
- },
- "name": "foo",
- "namespace": None,
- }
+ mock_kube_client.patch_namespaced_pod.assert_called_once_with(
+ body={"metadata": {"labels": {"airflow-worker": "modified"}}},
+ name="foo",
+ namespace=None,
+ )
assert pod_ids == {}
assert executor.running == {ti_key}
+ @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ def test_adopt_launched_task_api_exception(self, mock_kube_client):
+ """We shouldn't think we are running the task if aren't able to patch
the pod"""
+ executor = self.kubernetes_executor
+ executor.scheduler_job_id = "modified"
+ annotations = {
+ "dag_id": "dag",
+ "run_id": "run_id",
+ "task_id": "task",
+ "try_number": "1",
+ }
+ ti_key = annotations_to_key(annotations)
+ pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo",
annotations=annotations))
+ pod_ids = {ti_key: {}}
+
+ mock_kube_client.patch_namespaced_pod.side_effect =
ApiException(status=400)
+ executor.adopt_launched_task(mock_kube_client, pod=pod,
pod_ids=pod_ids)
+ mock_kube_client.patch_namespaced_pod.assert_called_once_with(
+ body={"metadata": {"labels": {"airflow-worker": "modified"}}},
+ name="foo",
+ namespace=None,
+ )
+ assert pod_ids == {ti_key: {}}
+ assert executor.running == set()
+
+ @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
+ def test_adopt_completed_pods(self, mock_kube_client):
+ """We should adopt all completed pods from other schedulers"""
+ executor = self.kubernetes_executor
+ executor.scheduler_job_id = "modified"
+ executor.kube_client = mock_kube_client
+ executor.kube_config.kube_namespace = "somens"
+ pod_names = ["one", "two"]
+ mock_kube_client.list_namespaced_pod.return_value.items = [
+ k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(
+ name=pod_name,
+ labels={"airflow-worker": pod_name},
+ annotations={"some_annotation": "hello"},
+ namespace="somens",
+ )
+ )
+ for pod_name in pod_names
+ ]
+
+ executor._adopt_completed_pods(mock_kube_client)
+ mock_kube_client.list_namespaced_pod.assert_called_once_with(
+ namespace="somens",
+ field_selector="status.phase=Succeeded",
+ label_selector="kubernetes_executor=True,airflow-worker!=modified",
+ )
+ assert len(pod_names) ==
mock_kube_client.patch_namespaced_pod.call_count
+ mock_kube_client.patch_namespaced_pod.assert_has_calls(
+ [
+ mock.call(
+ body={"metadata": {"labels": {"airflow-worker":
"modified"}}},
+ name=pod_name,
+ namespace="somens",
+ )
+ for pod_name in pod_names
+ ],
+ any_order=True,
+ )
+
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
def test_not_adopt_unassigned_task(self, mock_kube_client):
"""