This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5efcb992e0 Rename most pod_id usage to pod_name in KubernetesExecutor 
(#29147)
5efcb992e0 is described below

commit 5efcb992e0692c69ba48c3b2f4144c6670162927
Author: Jed Cunningham <[email protected]>
AuthorDate: Sat Apr 22 15:28:29 2023 -0500

    Rename most pod_id usage to pod_name in KubernetesExecutor (#29147)
    
    We were using pod_id in a lot of place, where really it is just the pod
    name. I've renamed it, where it is easy to do so, so things are easier
    to follow.
---
 airflow/executors/kubernetes_executor.py    | 90 +++++++++++++++--------------
 tests/executors/test_kubernetes_executor.py | 56 +++++++++---------
 2 files changed, 76 insertions(+), 70 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 8e4ac0dac0..f8e816e9f5 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -58,10 +58,10 @@ if TYPE_CHECKING:
     # TaskInstance key, command, configuration, pod_template_file
     KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]]
 
-    # key, pod state, pod_id, namespace, resource_version
+    # key, pod state, pod_name, namespace, resource_version
     KubernetesResultsType = Tuple[TaskInstanceKey, Optional[str], str, str, 
str]
 
-    # pod_id, namespace, pod state, annotations, resource_version
+    # pod_name, namespace, pod state, annotations, resource_version
     KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 ALL_NAMESPACES = "ALL_NAMESPACES"
@@ -180,7 +180,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
                 task_instance_related_annotations["map_index"] = map_index
 
             self.process_status(
-                pod_id=task.metadata.name,
+                pod_name=task.metadata.name,
                 namespace=task.metadata.namespace,
                 status=task.status.phase,
                 annotations=task_instance_related_annotations,
@@ -208,7 +208,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
 
     def process_status(
         self,
-        pod_id: str,
+        pod_name: str,
         namespace: str,
         status: str,
         annotations: dict[str, str],
@@ -218,28 +218,28 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
         """Process status response."""
         if status == "Pending":
             if event["type"] == "DELETED":
-                self.log.info("Event: Failed to start pod %s", pod_id)
-                self.watcher_queue.put((pod_id, namespace, State.FAILED, 
annotations, resource_version))
+                self.log.info("Event: Failed to start pod %s", pod_name)
+                self.watcher_queue.put((pod_name, namespace, State.FAILED, 
annotations, resource_version))
             else:
-                self.log.debug("Event: %s Pending", pod_id)
+                self.log.debug("Event: %s Pending", pod_name)
         elif status == "Failed":
-            self.log.error("Event: %s Failed", pod_id)
-            self.watcher_queue.put((pod_id, namespace, State.FAILED, 
annotations, resource_version))
+            self.log.error("Event: %s Failed", pod_name)
+            self.watcher_queue.put((pod_name, namespace, State.FAILED, 
annotations, resource_version))
         elif status == "Succeeded":
-            self.log.info("Event: %s Succeeded", pod_id)
-            self.watcher_queue.put((pod_id, namespace, State.SUCCESS, 
annotations, resource_version))
+            self.log.info("Event: %s Succeeded", pod_name)
+            self.watcher_queue.put((pod_name, namespace, State.SUCCESS, 
annotations, resource_version))
         elif status == "Running":
             if event["type"] == "DELETED":
-                self.log.info("Event: Pod %s deleted before it could 
complete", pod_id)
-                self.watcher_queue.put((pod_id, namespace, State.FAILED, 
annotations, resource_version))
+                self.log.info("Event: Pod %s deleted before it could 
complete", pod_name)
+                self.watcher_queue.put((pod_name, namespace, State.FAILED, 
annotations, resource_version))
             else:
-                self.log.info("Event: %s is Running", pod_id)
+                self.log.info("Event: %s is Running", pod_name)
         else:
             self.log.warning(
                 "Event: Invalid state: %s on pod: %s in namespace %s with 
annotations: %s with "
                 "resource_version: %s",
                 status,
-                pod_id,
+                pod_name,
                 namespace,
                 annotations,
                 resource_version,
@@ -368,12 +368,12 @@ class AirflowKubernetesScheduler(LoggingMixin):
         self.run_pod_async(pod, **self.kube_config.kube_client_request_args)
         self.log.debug("Kubernetes Job created!")
 
-    def delete_pod(self, pod_id: str, namespace: str) -> None:
-        """Deletes POD."""
+    def delete_pod(self, pod_name: str, namespace: str) -> None:
+        """Deletes Pod from a namespace. Does not raise if it does not 
exist."""
         try:
-            self.log.debug("Deleting pod %s in namespace %s", pod_id, 
namespace)
+            self.log.debug("Deleting pod %s in namespace %s", pod_name, 
namespace)
             self.kube_client.delete_namespaced_pod(
-                pod_id,
+                pod_name,
                 namespace,
                 
body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
                 **self.kube_config.kube_client_request_args,
@@ -419,14 +419,14 @@ class AirflowKubernetesScheduler(LoggingMixin):
 
     def process_watcher_task(self, task: KubernetesWatchType) -> None:
         """Process the task by watcher."""
-        pod_id, namespace, state, annotations, resource_version = task
+        pod_name, namespace, state, annotations, resource_version = task
         self.log.debug(
-            "Attempting to finish pod; pod_id: %s; state: %s; annotations: 
%s", pod_id, state, annotations
+            "Attempting to finish pod; pod_name: %s; state: %s; annotations: 
%s", pod_name, state, annotations
         )
         key = annotations_to_key(annotations=annotations)
         if key:
-            self.log.debug("finishing job %s - %s (%s)", key, state, pod_id)
-            self.result_queue.put((key, state, pod_id, namespace, 
resource_version))
+            self.log.debug("finishing job %s - %s (%s)", key, state, pod_name)
+            self.result_queue.put((key, state, pod_name, namespace, 
resource_version))
 
     def _flush_watcher_queue(self) -> None:
         self.log.debug("Executor shutting down, watcher_queue approx. 
size=%d", self.watcher_queue.qsize())
@@ -658,11 +658,11 @@ class KubernetesExecutor(BaseExecutor):
             try:
                 results = self.result_queue.get_nowait()
                 try:
-                    key, state, pod_id, namespace, resource_version = results
+                    key, state, pod_name, namespace, resource_version = results
                     last_resource_version[namespace] = resource_version
                     self.log.info("Changing state of %s to %s", results, state)
                     try:
-                        self._change_state(key, state, pod_id, namespace)
+                        self._change_state(key, state, pod_name, namespace)
                     except Exception as e:
                         self.log.exception(
                             "Exception: %s when attempting to change state of 
%s to %s, re-queueing.",
@@ -725,7 +725,7 @@ class KubernetesExecutor(BaseExecutor):
         next_event = self.event_scheduler.run(blocking=False)
         self.log.debug("Next timed event is in %f", next_event)
 
-    def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: 
str, namespace: str) -> None:
+    def _change_state(self, key: TaskInstanceKey, state: str | None, pod_name: 
str, namespace: str) -> None:
         if TYPE_CHECKING:
             assert self.kube_scheduler
 
@@ -735,10 +735,10 @@ class KubernetesExecutor(BaseExecutor):
 
         if self.kube_config.delete_worker_pods:
             if state != State.FAILED or 
self.kube_config.delete_worker_pods_on_failure:
-                self.kube_scheduler.delete_pod(pod_id, namespace)
+                self.kube_scheduler.delete_pod(pod_name=pod_name, 
namespace=namespace)
                 self.log.info("Deleted pod: %s in namespace %s", str(key), 
str(namespace))
         else:
-            self.kube_scheduler.patch_pod_executor_done(pod_name=pod_id, 
namespace=namespace)
+            self.kube_scheduler.patch_pod_executor_done(pod_name=pod_name, 
namespace=namespace)
             self.log.info("Patched pod %s in namespace %s to mark it as done", 
str(key), str(namespace))
 
         try:
@@ -801,9 +801,10 @@ class KubernetesExecutor(BaseExecutor):
         return messages, ["\n".join(log)]
 
     def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> 
Sequence[TaskInstance]:
+        # Always flush TIs without queued_by_job_id
         tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
         scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+        tis_to_flush_by_key = {ti.key: ti for ti in tis if ti.queued_by_job_id}
         kube_client: client.CoreV1Api = self.kube_client
         for scheduler_job_id in scheduler_job_ids:
             scheduler_job_id = 
pod_generator.make_safe_label_value(str(scheduler_job_id))
@@ -821,9 +822,9 @@ class KubernetesExecutor(BaseExecutor):
             }
             pod_list = self._list_pods(query_kwargs)
             for pod in pod_list:
-                self.adopt_launched_task(kube_client, pod, pod_ids)
+                self.adopt_launched_task(kube_client, pod, tis_to_flush_by_key)
         self._adopt_completed_pods(kube_client)
-        tis_to_flush.extend(pod_ids.values())
+        tis_to_flush.extend(tis_to_flush_by_key.values())
         return tis_to_flush
 
     def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
@@ -861,26 +862,29 @@ class KubernetesExecutor(BaseExecutor):
                 self.log.warning("Found multiple pods for ti %s: %s", ti, 
pod_list)
                 continue
             readable_tis.append(repr(ti))
-            self.kube_scheduler.delete_pod(pod_id=pod_list[0].metadata.name, 
namespace=namespace)
+            self.kube_scheduler.delete_pod(pod_name=pod_list[0].metadata.name, 
namespace=namespace)
         return readable_tis
 
     def adopt_launched_task(
-        self, kube_client: client.CoreV1Api, pod: k8s.V1Pod, pod_ids: 
dict[TaskInstanceKey, k8s.V1Pod]
+        self,
+        kube_client: client.CoreV1Api,
+        pod: k8s.V1Pod,
+        tis_to_flush_by_key: dict[TaskInstanceKey, k8s.V1Pod],
     ) -> None:
         """
         Patch existing pod so that the current KubernetesJobWatcher can 
monitor it via label selectors.
 
         :param kube_client: kubernetes client for speaking to kube API
         :param pod: V1Pod spec that we will patch with new label
-        :param pod_ids: pod_ids we expect to patch.
+        :param tis_to_flush_by_key: TIs that will be flushed if they aren't 
adopted
         """
         if TYPE_CHECKING:
             assert self.scheduler_job_id
 
         self.log.info("attempting to adopt pod %s", pod.metadata.name)
-        pod_id = annotations_to_key(pod.metadata.annotations)
-        if pod_id not in pod_ids:
-            self.log.error("attempting to adopt taskinstance which was not 
specified by database: %s", pod_id)
+        ti_key = annotations_to_key(pod.metadata.annotations)
+        if ti_key not in tis_to_flush_by_key:
+            self.log.error("attempting to adopt taskinstance which was not 
specified by database: %s", ti_key)
             return
 
         new_worker_id_label = 
pod_generator.make_safe_label_value(self.scheduler_job_id)
@@ -894,8 +898,8 @@ class KubernetesExecutor(BaseExecutor):
             self.log.info("Failed to adopt pod %s. Reason: %s", 
pod.metadata.name, e)
             return
 
-        del pod_ids[pod_id]
-        self.running.add(pod_id)
+        del tis_to_flush_by_key[ti_key]
+        self.running.add(ti_key)
 
     def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
         """
@@ -925,8 +929,8 @@ class KubernetesExecutor(BaseExecutor):
                 )
             except ApiException as e:
                 self.log.info("Failed to adopt pod %s. Reason: %s", 
pod.metadata.name, e)
-            pod_id = annotations_to_key(pod.metadata.annotations)
-            self.running.add(pod_id)
+            ti_id = annotations_to_key(pod.metadata.annotations)
+            self.running.add(ti_id)
 
     def _flush_task_queue(self) -> None:
         if TYPE_CHECKING:
@@ -952,12 +956,12 @@ class KubernetesExecutor(BaseExecutor):
                 results = self.result_queue.get_nowait()
                 self.log.warning("Executor shutting down, flushing 
results=%s", results)
                 try:
-                    key, state, pod_id, namespace, resource_version = results
+                    key, state, pod_name, namespace, resource_version = results
                     self.log.info(
                         "Changing state of %s to %s : resource_version=%d", 
results, state, resource_version
                     )
                     try:
-                        self._change_state(key, state, pod_id, namespace)
+                        self._change_state(key, state, pod_name, namespace)
                     except Exception as e:
                         self.log.exception(
                             "Ignoring exception: %s when attempting to change 
state of %s to %s.",
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index c2b7d24c55..94e092b447 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -159,7 +159,7 @@ class TestAirflowKubernetesScheduler:
     @mock.patch("airflow.executors.kubernetes_executor.client")
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
     def test_delete_pod_successfully(self, mock_watcher, mock_client, 
mock_kube_client):
-        pod_id = "my-pod-1"
+        pod_name = "my-pod-1"
         namespace = "my-namespace-1"
 
         mock_delete_namespace = mock.MagicMock()
@@ -169,8 +169,8 @@ class TestAirflowKubernetesScheduler:
         kube_executor.job_id = 1
         kube_executor.start()
         try:
-            kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
-            mock_delete_namespace.assert_called_with(pod_id, namespace, 
body=mock_client.V1DeleteOptions())
+            kube_executor.kube_scheduler.delete_pod(pod_name, namespace)
+            mock_delete_namespace.assert_called_with(pod_name, namespace, 
body=mock_client.V1DeleteOptions())
         finally:
             kube_executor.end()
 
@@ -181,7 +181,7 @@ class TestAirflowKubernetesScheduler:
     @mock.patch("airflow.executors.kubernetes_executor.client")
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
     def test_delete_pod_raises_404(self, mock_watcher, mock_client, 
mock_kube_client):
-        pod_id = "my-pod-1"
+        pod_name = "my-pod-1"
         namespace = "my-namespace-2"
 
         mock_delete_namespace = mock.MagicMock()
@@ -194,8 +194,8 @@ class TestAirflowKubernetesScheduler:
         kube_executor.start()
 
         with pytest.raises(ApiException):
-            kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
-            mock_delete_namespace.assert_called_with(pod_id, namespace, 
body=mock_client.V1DeleteOptions())
+            kube_executor.kube_scheduler.delete_pod(pod_name, namespace)
+            mock_delete_namespace.assert_called_with(pod_name, namespace, 
body=mock_client.V1DeleteOptions())
 
     @pytest.mark.skipif(
         AirflowKubernetesScheduler is None, reason="kubernetes python package 
is not installed"
@@ -204,7 +204,7 @@ class TestAirflowKubernetesScheduler:
     @mock.patch("airflow.executors.kubernetes_executor.client")
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
     def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, 
mock_kube_client):
-        pod_id = "my-pod-1"
+        pod_name = "my-pod-1"
         namespace = "my-namespace-3"
 
         mock_delete_namespace = mock.MagicMock()
@@ -216,8 +216,8 @@ class TestAirflowKubernetesScheduler:
         kube_executor.job_id = 1
         kube_executor.start()
         try:
-            kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
-            mock_delete_namespace.assert_called_with(pod_id, namespace, 
body=mock_client.V1DeleteOptions())
+            kube_executor.kube_scheduler.delete_pod(pod_name, namespace)
+            mock_delete_namespace.assert_called_with(pod_name, namespace, 
body=mock_client.V1DeleteOptions())
         finally:
             kube_executor.end()
 
@@ -531,7 +531,7 @@ class TestKubernetesExecutor:
         try:
             key = ("dag_id", "task_id", "run_id", "try_number1")
             executor.running = {key}
-            executor._change_state(key, State.RUNNING, "pod_id", "default")
+            executor._change_state(key, State.RUNNING, "pod_name", "default")
             assert executor.event_buffer[key][0] == State.RUNNING
             assert executor.running == {key}
         finally:
@@ -546,10 +546,10 @@ class TestKubernetesExecutor:
         try:
             key = ("dag_id", "task_id", "run_id", "try_number2")
             executor.running = {key}
-            executor._change_state(key, State.SUCCESS, "pod_id", "default")
+            executor._change_state(key, State.SUCCESS, "pod_name", "default")
             assert executor.event_buffer[key][0] == State.SUCCESS
             assert executor.running == set()
-            mock_delete_pod.assert_called_once_with("pod_id", "default")
+            mock_delete_pod.assert_called_once_with(pod_name="pod_name", 
namespace="default")
         finally:
             executor.end()
 
@@ -615,11 +615,11 @@ class TestKubernetesExecutor:
         try:
             key = ("dag_id", "task_id", "run_id", "try_number2")
             executor.running = {key}
-            executor._change_state(key, State.SUCCESS, "pod_id", 
"test-namespace")
+            executor._change_state(key, State.SUCCESS, "pod_name", 
"test-namespace")
             assert executor.event_buffer[key][0] == State.SUCCESS
             assert executor.running == set()
             mock_delete_pod.assert_not_called()
-            mock_patch_pod.assert_called_once_with(pod_name="pod_id", 
namespace="test-namespace")
+            mock_patch_pod.assert_called_once_with(pod_name="pod_name", 
namespace="test-namespace")
         finally:
             executor.end()
 
@@ -638,10 +638,10 @@ class TestKubernetesExecutor:
         try:
             key = ("dag_id", "task_id", "run_id", "try_number2")
             executor.running = {key}
-            executor._change_state(key, State.FAILED, "pod_id", 
"test-namespace")
+            executor._change_state(key, State.FAILED, "pod_name", 
"test-namespace")
             assert executor.event_buffer[key][0] == State.FAILED
             assert executor.running == set()
-            mock_delete_pod.assert_called_once_with("pod_id", "test-namespace")
+            mock_delete_pod.assert_called_once_with(pod_name="pod_name", 
namespace="test-namespace")
             mock_patch_pod.assert_not_called()
         finally:
             executor.end()
@@ -683,8 +683,10 @@ class TestKubernetesExecutor:
 
         mock_ti.queued_by_job_id = "10"  # scheduler_job would have updated 
this after the first adoption
         executor.scheduler_job_id = "20"
-        # assume success adopting when checking return, `adopt_launched_task` 
pops `ti_key` from `pod_ids`
-        mock_adopt_launched_task.side_effect = lambda client, pod, pod_ids: 
pod_ids.pop(ti_key)
+        # assume success adopting, `adopt_launched_task` pops `ti_key` from 
`tis_to_flush_by_key`
+        mock_adopt_launched_task.side_effect = (
+            lambda client, pod, tis_to_flush_by_key: 
tis_to_flush_by_key.pop(ti_key)
+        )
 
         reset_tis = executor.try_adopt_task_instances([mock_ti])
         mock_kube_client.list_namespaced_pod.assert_called_once_with(
@@ -757,15 +759,15 @@ class TestKubernetesExecutor:
         pod = k8s.V1Pod(
             metadata=k8s.V1ObjectMeta(name="foo", labels={"airflow-worker": 
"bar"}, annotations=annotations)
         )
-        pod_ids = {ti_key: {}}
+        tis_to_flush_by_key = {ti_key: {}}
 
-        executor.adopt_launched_task(mock_kube_client, pod=pod, 
pod_ids=pod_ids)
+        executor.adopt_launched_task(mock_kube_client, pod=pod, 
tis_to_flush_by_key=tis_to_flush_by_key)
         mock_kube_client.patch_namespaced_pod.assert_called_once_with(
             body={"metadata": {"labels": {"airflow-worker": "modified"}}},
             name="foo",
             namespace=None,
         )
-        assert pod_ids == {}
+        assert tis_to_flush_by_key == {}
         assert executor.running == {ti_key}
 
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@@ -781,16 +783,16 @@ class TestKubernetesExecutor:
         }
         ti_key = annotations_to_key(annotations)
         pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo", 
annotations=annotations))
-        pod_ids = {ti_key: {}}
+        tis_to_flush_by_key = {ti_key: {}}
 
         mock_kube_client.patch_namespaced_pod.side_effect = 
ApiException(status=400)
-        executor.adopt_launched_task(mock_kube_client, pod=pod, 
pod_ids=pod_ids)
+        executor.adopt_launched_task(mock_kube_client, pod=pod, 
tis_to_flush_by_key=tis_to_flush_by_key)
         mock_kube_client.patch_namespaced_pod.assert_called_once_with(
             body={"metadata": {"labels": {"airflow-worker": "modified"}}},
             name="foo",
             namespace=None,
         )
-        assert pod_ids == {ti_key: {}}
+        assert tis_to_flush_by_key == {ti_key: {}}
         assert executor.running == set()
 
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@@ -852,7 +854,7 @@ class TestKubernetesExecutor:
 
         executor = self.kubernetes_executor
         executor.scheduler_job_id = "modified"
-        pod_ids = {"foobar": {}}
+        tis_to_flush_by_key = {"foobar": {}}
         pod = k8s.V1Pod(
             metadata=k8s.V1ObjectMeta(
                 name="foo",
@@ -865,9 +867,9 @@ class TestKubernetesExecutor:
                 },
             )
         )
-        executor.adopt_launched_task(mock_kube_client, pod=pod, 
pod_ids=pod_ids)
+        executor.adopt_launched_task(mock_kube_client, pod=pod, 
tis_to_flush_by_key=tis_to_flush_by_key)
         assert not mock_kube_client.patch_namespaced_pod.called
-        assert pod_ids == {"foobar": {}}
+        assert tis_to_flush_by_key == {"foobar": {}}
 
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
     
@mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod")

Reply via email to