This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b9c574c61a list pods performance optimization (#36092)
b9c574c61a is described below

commit b9c574c61ae42481b9d2c9ce7c42c93dc44b9507
Author: Gopal Dirisala <[email protected]>
AuthorDate: Sun Dec 10 17:19:39 2023 +0530

    list pods performance optimization (#36092)
---
 .../kubernetes/executors/kubernetes_executor.py    |  31 ++++--
 .../executors/test_kubernetes_executor.py          | 117 ++++++++++++++++-----
 2 files changed, 110 insertions(+), 38 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 16923d63e2..6e32c00473 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -34,6 +34,7 @@ from datetime import datetime
 from queue import Empty, Queue
 from typing import TYPE_CHECKING, Any, Sequence
 
+from kubernetes.dynamic import DynamicClient
 from sqlalchemy import select, update
 
 from airflow.providers.cncf.kubernetes.pod_generator import 
PodMutationHookException, PodReconciliationError
@@ -160,19 +161,31 @@ class KubernetesExecutor(BaseExecutor):
         super().__init__(parallelism=self.kube_config.parallelism)
 
     def _list_pods(self, query_kwargs):
+        query_kwargs["header_params"] = {
+            "Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"
+        }
+        dynamic_client = DynamicClient(self.kube_client.api_client)
+        pod_resource = dynamic_client.resources.get(api_version="v1", 
kind="Pod")
         if self.kube_config.multi_namespace_mode:
             if self.kube_config.multi_namespace_mode_namespace_list:
-                pods = []
-                for namespace in 
self.kube_config.multi_namespace_mode_namespace_list:
-                    pods.extend(
-                        
self.kube_client.list_namespaced_pod(namespace=namespace, **query_kwargs).items
-                    )
+                namespaces = 
self.kube_config.multi_namespace_mode_namespace_list
             else:
-                pods = 
self.kube_client.list_pod_for_all_namespaces(**query_kwargs).items
+                namespaces = [None]
         else:
-            pods = self.kube_client.list_namespaced_pod(
-                namespace=self.kube_config.kube_namespace, **query_kwargs
-            ).items
+            namespaces = [self.kube_config.kube_namespace]
+
+        pods = []
+        for namespace in namespaces:
+            # Dynamic Client list pods is throwing TypeError when there are no 
matching pods to return
+            # This bug was fixed in MR 
https://github.com/kubernetes-client/python/pull/2155
+            # TODO: Remove the try-except clause once we upgrade the K8 Python 
client version which
+            # includes the above MR
+            try:
+                pods.extend(
+                    dynamic_client.get(resource=pod_resource, 
namespace=namespace, **query_kwargs).items
+                )
+            except TypeError:
+                continue
 
         return pods
 
diff --git 
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py 
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index 35584a4d82..a0b187087a 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -721,13 +721,16 @@ class TestKubernetesExecutor:
         finally:
             executor.end()
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
     @mock.patch(
         
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task"
     )
     @mock.patch(
         
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
     )
-    def test_try_adopt_task_instances(self, mock_adopt_completed_pods, 
mock_adopt_launched_task):
+    def test_try_adopt_task_instances(
+        self, mock_adopt_completed_pods, mock_adopt_launched_task, 
mock_kube_dynamic_client
+    ):
         executor = self.kubernetes_executor
         executor.scheduler_job_id = "10"
         ti_key = annotations_to_key(
@@ -741,22 +744,27 @@ class TestKubernetesExecutor:
         mock_ti = mock.MagicMock(queued_by_job_id="1", 
external_executor_id="1", key=ti_key)
         pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo"))
         mock_kube_client = mock.MagicMock()
-        mock_kube_client.list_namespaced_pod.return_value.items = [pod]
         executor.kube_client = mock_kube_client
+        mock_kube_dynamic_client.return_value = mock.MagicMock()
+        mock_pod_resource = mock.MagicMock()
+        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
+        mock_kube_dynamic_client.return_value.get.return_value.items = [pod]
 
         # First adoption
         reset_tis = executor.try_adopt_task_instances([mock_ti])
-        mock_kube_client.list_namespaced_pod.assert_called_once_with(
+        mock_kube_dynamic_client.return_value.get.assert_called_once_with(
+            resource=mock_pod_resource,
             namespace="default",
             field_selector="status.phase!=Succeeded",
             
label_selector="kubernetes_executor=True,airflow-worker=1,airflow_executor_done!=True",
+            header_params={"Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
         )
         mock_adopt_launched_task.assert_called_once_with(mock_kube_client, 
pod, {ti_key: mock_ti})
         mock_adopt_completed_pods.assert_called_once()
         assert reset_tis == [mock_ti]  # assume failure adopting when checking 
return
 
         # Second adoption (queued_by_job_id and external_executor_id no longer 
match)
-        mock_kube_client.reset_mock()
+        mock_kube_dynamic_client.return_value.reset_mock()
         mock_adopt_launched_task.reset_mock()
         mock_adopt_completed_pods.reset_mock()
 
@@ -768,23 +776,31 @@ class TestKubernetesExecutor:
         )
 
         reset_tis = executor.try_adopt_task_instances([mock_ti])
-        mock_kube_client.list_namespaced_pod.assert_called_once_with(
+        mock_kube_dynamic_client.return_value.get.assert_called_once_with(
+            resource=mock_pod_resource,
             namespace="default",
             field_selector="status.phase!=Succeeded",
             
label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True",
+            header_params={"Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
         )
         mock_adopt_launched_task.assert_called_once()  # Won't check args this 
time around as they get mutated
         mock_adopt_completed_pods.assert_called_once()
         assert reset_tis == []  # This time our return is empty - no TIs to 
reset
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
     @mock.patch(
         
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
     )
-    def test_try_adopt_task_instances_multiple_scheduler_ids(self, 
mock_adopt_completed_pods):
+    def test_try_adopt_task_instances_multiple_scheduler_ids(
+        self, mock_adopt_completed_pods, mock_kube_dynamic_client
+    ):
         """We try to find pods only once per scheduler id"""
         executor = self.kubernetes_executor
         mock_kube_client = mock.MagicMock()
         executor.kube_client = mock_kube_client
+        mock_kube_dynamic_client.return_value = mock.MagicMock()
+        mock_pod_resource = mock.MagicMock()
+        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
 
         mock_tis = [
             mock.MagicMock(queued_by_job_id="10", external_executor_id="1", 
dag_id="dag", task_id="task"),
@@ -793,23 +809,32 @@ class TestKubernetesExecutor:
         ]
 
         executor.try_adopt_task_instances(mock_tis)
-        assert mock_kube_client.list_namespaced_pod.call_count == 2
-        mock_kube_client.list_namespaced_pod.assert_has_calls(
+        assert mock_kube_dynamic_client.return_value.get.call_count == 2
+        mock_kube_dynamic_client.return_value.get.assert_has_calls(
             [
                 mock.call(
+                    resource=mock_pod_resource,
                     namespace="default",
                     field_selector="status.phase!=Succeeded",
                     
label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True",
+                    header_params={
+                        "Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"
+                    },
                 ),
                 mock.call(
+                    resource=mock_pod_resource,
                     namespace="default",
                     field_selector="status.phase!=Succeeded",
                     
label_selector="kubernetes_executor=True,airflow-worker=40,airflow_executor_done!=True",
+                    header_params={
+                        "Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"
+                    },
                 ),
             ],
             any_order=True,
         )
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
     @mock.patch(
         
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task"
     )
@@ -817,13 +842,14 @@ class TestKubernetesExecutor:
         
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
     )
     def test_try_adopt_task_instances_no_matching_pods(
-        self, mock_adopt_completed_pods, mock_adopt_launched_task
+        self, mock_adopt_completed_pods, mock_adopt_launched_task, 
mock_kube_dynamic_client
     ):
         executor = self.kubernetes_executor
         mock_ti = mock.MagicMock(queued_by_job_id="1", 
external_executor_id="1", dag_id="dag", task_id="task")
         mock_kube_client = mock.MagicMock()
-        mock_kube_client.list_namespaced_pod.return_value.items = []
         executor.kube_client = mock_kube_client
+        mock_kube_dynamic_client.return_value = mock.MagicMock()
+        mock_kube_dynamic_client.return_value.get.return_value.items = []
 
         tis_to_flush = executor.try_adopt_task_instances([mock_ti])
         assert tis_to_flush == [mock_ti]
@@ -880,12 +906,17 @@ class TestKubernetesExecutor:
         assert tis_to_flush_by_key == {ti_key: {}}
         assert executor.running == set()
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
     
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
-    def test_adopt_completed_pods(self, mock_kube_client):
+    def test_adopt_completed_pods(self, mock_kube_client, 
mock_kube_dynamic_client):
         """We should adopt all completed pods from other schedulers"""
         executor = self.kubernetes_executor
         executor.scheduler_job_id = "modified"
         executor.kube_client = mock_kube_client
+        mock_kube_dynamic_client.return_value = mock.MagicMock()
+        mock_pod_resource = mock.MagicMock()
+        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
+        mock_kube_dynamic_client.return_value.get.return_value.items = []
         executor.kube_config.kube_namespace = "somens"
         pod_names = ["one", "two"]
 
@@ -897,7 +928,7 @@ class TestKubernetesExecutor:
                 "try_number": "1",
             }
 
-        mock_kube_client.list_namespaced_pod.return_value.items = [
+        mock_kube_dynamic_client.return_value.get.return_value.items = [
             k8s.V1Pod(
                 metadata=k8s.V1ObjectMeta(
                     name=pod_name,
@@ -911,10 +942,12 @@ class TestKubernetesExecutor:
         expected_running_ti_keys = 
{annotations_to_key(get_annotations(pod_name)) for pod_name in pod_names}
 
         executor._adopt_completed_pods(mock_kube_client)
-        mock_kube_client.list_namespaced_pod.assert_called_once_with(
+        mock_kube_dynamic_client.return_value.get.assert_called_once_with(
+            resource=mock_pod_resource,
             namespace="somens",
             field_selector="status.phase=Succeeded",
             
label_selector="kubernetes_executor=True,airflow-worker!=modified,airflow_executor_done!=True",
+            header_params={"Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
         )
         assert len(pod_names) == 
mock_kube_client.patch_namespaced_pod.call_count
         mock_kube_client.patch_namespaced_pod.assert_has_calls(
@@ -1002,10 +1035,16 @@ class TestKubernetesExecutor:
         assert executor.kube_config.multi_namespace_mode_namespace_list == 
expected_value_in_kube_config
 
     @pytest.mark.db_test
-    def test_clear_not_launched_queued_tasks_not_launched(self, dag_maker, 
create_dummy_dag, session):
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+    def test_clear_not_launched_queued_tasks_not_launched(
+        self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
+    ):
         """If a pod isn't found for a TI, reset the state to scheduled"""
         mock_kube_client = mock.MagicMock()
-        mock_kube_client.list_namespaced_pod.return_value = 
k8s.V1PodList(items=[])
+        mock_kube_dynamic_client.return_value = mock.MagicMock()
+        mock_pod_resource = mock.MagicMock()
+        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
+        mock_kube_dynamic_client.return_value.get.return_value.items = []
 
         create_dummy_dag(dag_id="test_clear", task_id="task1", 
with_dagrun_type=None)
         dag_run = dag_maker.create_dagrun()
@@ -1022,9 +1061,12 @@ class TestKubernetesExecutor:
 
         ti.refresh_from_db()
         assert ti.state == State.SCHEDULED
-        assert mock_kube_client.list_namespaced_pod.call_count == 1
-        mock_kube_client.list_namespaced_pod.assert_called_with(
-            namespace="default", label_selector="airflow-worker=1"
+        assert mock_kube_dynamic_client.return_value.get.call_count == 1
+        mock_kube_dynamic_client.return_value.get.assert_called_with(
+            resource=mock_pod_resource,
+            namespace="default",
+            label_selector="airflow-worker=1",
+            header_params={"Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
         )
 
     @pytest.mark.db_test
@@ -1036,12 +1078,16 @@ class TestKubernetesExecutor:
             pytest.param("kubernetes", "kubernetes"),
         ],
     )
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
     def test_clear_not_launched_queued_tasks_launched(
-        self, dag_maker, create_dummy_dag, session, task_queue, 
kubernetes_queue
+        self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session, 
task_queue, kubernetes_queue
     ):
         """Leave the state alone if a pod already exists"""
         mock_kube_client = mock.MagicMock()
-        mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(
+        mock_kube_dynamic_client.return_value = mock.MagicMock()
+        mock_pod_resource = mock.MagicMock()
+        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
+        mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList(
             items=[
                 k8s.V1Pod(
                     metadata=k8s.V1ObjectMeta(
@@ -1075,15 +1121,19 @@ class TestKubernetesExecutor:
 
         ti.refresh_from_db()
         assert ti.state == State.QUEUED
-        mock_kube_client.list_namespaced_pod.assert_called_once_with(
-            namespace="default", label_selector="airflow-worker=1"
+        mock_kube_dynamic_client.return_value.get.assert_called_once_with(
+            resource=mock_pod_resource,
+            namespace="default",
+            label_selector="airflow-worker=1",
+            header_params={"Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
         )
 
     @pytest.mark.db_test
-    def test_clear_not_launched_queued_tasks_mapped_task(self, dag_maker, 
session):
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+    def test_clear_not_launched_queued_tasks_mapped_task(self, 
mock_kube_dynamic_client, dag_maker, session):
         """One mapped task has a launched pod - other does not."""
 
-        def list_namespaced_pod(*args, **kwargs):
+        def get(*args, **kwargs):
             return k8s.V1PodList(
                 items=[
                     k8s.V1Pod(
@@ -1103,7 +1153,10 @@ class TestKubernetesExecutor:
             )
 
         mock_kube_client = mock.MagicMock()
-        mock_kube_client.list_namespaced_pod.side_effect = list_namespaced_pod
+        mock_kube_dynamic_client.return_value = mock.MagicMock()
+        mock_pod_resource = mock.MagicMock()
+        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
+        mock_kube_dynamic_client.return_value.get.side_effect = get
 
         with dag_maker(dag_id="test_clear"):
             op = 
BashOperator.partial(task_id="bash").expand(bash_command=["echo 0", "echo 1"])
@@ -1129,10 +1182,12 @@ class TestKubernetesExecutor:
         assert ti0.state == State.QUEUED
         assert ti1.state == State.SCHEDULED
 
-        assert mock_kube_client.list_namespaced_pod.call_count == 1
-        mock_kube_client.list_namespaced_pod.assert_called_with(
+        assert mock_kube_dynamic_client.return_value.get.call_count == 1
+        mock_kube_dynamic_client.return_value.get.assert_called_with(
+            resource=mock_pod_resource,
             namespace="default",
             label_selector="airflow-worker=1",
+            header_params={"Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
         )
 
     @pytest.mark.db_test
@@ -1163,10 +1218,14 @@ class TestKubernetesExecutor:
         assert mock_kube_client.list_namespaced_pod.call_count == 0
 
     @pytest.mark.db_test
-    def test_clear_not_launched_queued_tasks_clear_only_by_job_id(self, 
dag_maker, create_dummy_dag, session):
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+    def test_clear_not_launched_queued_tasks_clear_only_by_job_id(
+        self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
+    ):
         """clear only not launched queued  tasks which are queued by the same 
executor job"""
         mock_kube_client = mock.MagicMock()
-        mock_kube_client.list_namespaced_pod.return_value = 
k8s.V1PodList(items=[])
+        mock_kube_dynamic_client.return_value = mock.MagicMock()
+        mock_kube_dynamic_client.return_value.get.return_value = 
k8s.V1PodList(items=[])
 
         create_dummy_dag(dag_id="test_clear_0", task_id="task0", 
with_dagrun_type=None)
         dag_run = dag_maker.create_dagrun()

Reply via email to