This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b9c574c61a list pods performance optimization (#36092)
b9c574c61a is described below
commit b9c574c61ae42481b9d2c9ce7c42c93dc44b9507
Author: Gopal Dirisala <[email protected]>
AuthorDate: Sun Dec 10 17:19:39 2023 +0530
list pods performance optimization (#36092)
---
.../kubernetes/executors/kubernetes_executor.py | 31 ++++--
.../executors/test_kubernetes_executor.py | 117 ++++++++++++++++-----
2 files changed, 110 insertions(+), 38 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 16923d63e2..6e32c00473 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -34,6 +34,7 @@ from datetime import datetime
from queue import Empty, Queue
from typing import TYPE_CHECKING, Any, Sequence
+from kubernetes.dynamic import DynamicClient
from sqlalchemy import select, update
from airflow.providers.cncf.kubernetes.pod_generator import
PodMutationHookException, PodReconciliationError
@@ -160,19 +161,31 @@ class KubernetesExecutor(BaseExecutor):
super().__init__(parallelism=self.kube_config.parallelism)
def _list_pods(self, query_kwargs):
+ query_kwargs["header_params"] = {
+ "Accept":
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"
+ }
+ dynamic_client = DynamicClient(self.kube_client.api_client)
+ pod_resource = dynamic_client.resources.get(api_version="v1",
kind="Pod")
if self.kube_config.multi_namespace_mode:
if self.kube_config.multi_namespace_mode_namespace_list:
- pods = []
- for namespace in
self.kube_config.multi_namespace_mode_namespace_list:
- pods.extend(
-
self.kube_client.list_namespaced_pod(namespace=namespace, **query_kwargs).items
- )
+ namespaces =
self.kube_config.multi_namespace_mode_namespace_list
else:
- pods =
self.kube_client.list_pod_for_all_namespaces(**query_kwargs).items
+ namespaces = [None]
else:
- pods = self.kube_client.list_namespaced_pod(
- namespace=self.kube_config.kube_namespace, **query_kwargs
- ).items
+ namespaces = [self.kube_config.kube_namespace]
+
+ pods = []
+ for namespace in namespaces:
+ # Dynamic Client list pods is throwing TypeError when there are no
matching pods to return
+ # This bug was fixed in MR
https://github.com/kubernetes-client/python/pull/2155
+ # TODO: Remove the try-except clause once we upgrade the K8 Python
client version which
+ # includes the above MR
+ try:
+ pods.extend(
+ dynamic_client.get(resource=pod_resource,
namespace=namespace, **query_kwargs).items
+ )
+ except TypeError:
+ continue
return pods
diff --git
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index 35584a4d82..a0b187087a 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -721,13 +721,16 @@ class TestKubernetesExecutor:
finally:
executor.end()
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task"
)
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
)
- def test_try_adopt_task_instances(self, mock_adopt_completed_pods,
mock_adopt_launched_task):
+ def test_try_adopt_task_instances(
+ self, mock_adopt_completed_pods, mock_adopt_launched_task,
mock_kube_dynamic_client
+ ):
executor = self.kubernetes_executor
executor.scheduler_job_id = "10"
ti_key = annotations_to_key(
@@ -741,22 +744,27 @@ class TestKubernetesExecutor:
mock_ti = mock.MagicMock(queued_by_job_id="1",
external_executor_id="1", key=ti_key)
pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo"))
mock_kube_client = mock.MagicMock()
- mock_kube_client.list_namespaced_pod.return_value.items = [pod]
executor.kube_client = mock_kube_client
+ mock_kube_dynamic_client.return_value = mock.MagicMock()
+ mock_pod_resource = mock.MagicMock()
+ mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
+ mock_kube_dynamic_client.return_value.get.return_value.items = [pod]
# First adoption
reset_tis = executor.try_adopt_task_instances([mock_ti])
- mock_kube_client.list_namespaced_pod.assert_called_once_with(
+ mock_kube_dynamic_client.return_value.get.assert_called_once_with(
+ resource=mock_pod_resource,
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=1,airflow_executor_done!=True",
+ header_params={"Accept":
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)
mock_adopt_launched_task.assert_called_once_with(mock_kube_client,
pod, {ti_key: mock_ti})
mock_adopt_completed_pods.assert_called_once()
assert reset_tis == [mock_ti] # assume failure adopting when checking
return
# Second adoption (queued_by_job_id and external_executor_id no longer
match)
- mock_kube_client.reset_mock()
+ mock_kube_dynamic_client.return_value.reset_mock()
mock_adopt_launched_task.reset_mock()
mock_adopt_completed_pods.reset_mock()
@@ -768,23 +776,31 @@ class TestKubernetesExecutor:
)
reset_tis = executor.try_adopt_task_instances([mock_ti])
- mock_kube_client.list_namespaced_pod.assert_called_once_with(
+ mock_kube_dynamic_client.return_value.get.assert_called_once_with(
+ resource=mock_pod_resource,
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True",
+ header_params={"Accept":
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)
mock_adopt_launched_task.assert_called_once() # Won't check args this
time around as they get mutated
mock_adopt_completed_pods.assert_called_once()
assert reset_tis == [] # This time our return is empty - no TIs to
reset
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
)
- def test_try_adopt_task_instances_multiple_scheduler_ids(self,
mock_adopt_completed_pods):
+ def test_try_adopt_task_instances_multiple_scheduler_ids(
+ self, mock_adopt_completed_pods, mock_kube_dynamic_client
+ ):
"""We try to find pods only once per scheduler id"""
executor = self.kubernetes_executor
mock_kube_client = mock.MagicMock()
executor.kube_client = mock_kube_client
+ mock_kube_dynamic_client.return_value = mock.MagicMock()
+ mock_pod_resource = mock.MagicMock()
+ mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
mock_tis = [
mock.MagicMock(queued_by_job_id="10", external_executor_id="1",
dag_id="dag", task_id="task"),
@@ -793,23 +809,32 @@ class TestKubernetesExecutor:
]
executor.try_adopt_task_instances(mock_tis)
- assert mock_kube_client.list_namespaced_pod.call_count == 2
- mock_kube_client.list_namespaced_pod.assert_has_calls(
+ assert mock_kube_dynamic_client.return_value.get.call_count == 2
+ mock_kube_dynamic_client.return_value.get.assert_has_calls(
[
mock.call(
+ resource=mock_pod_resource,
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True",
+ header_params={
+ "Accept":
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"
+ },
),
mock.call(
+ resource=mock_pod_resource,
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=40,airflow_executor_done!=True",
+ header_params={
+ "Accept":
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"
+ },
),
],
any_order=True,
)
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task"
)
@@ -817,13 +842,14 @@ class TestKubernetesExecutor:
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
)
def test_try_adopt_task_instances_no_matching_pods(
- self, mock_adopt_completed_pods, mock_adopt_launched_task
+ self, mock_adopt_completed_pods, mock_adopt_launched_task,
mock_kube_dynamic_client
):
executor = self.kubernetes_executor
mock_ti = mock.MagicMock(queued_by_job_id="1",
external_executor_id="1", dag_id="dag", task_id="task")
mock_kube_client = mock.MagicMock()
- mock_kube_client.list_namespaced_pod.return_value.items = []
executor.kube_client = mock_kube_client
+ mock_kube_dynamic_client.return_value = mock.MagicMock()
+ mock_kube_dynamic_client.return_value.get.return_value.items = []
tis_to_flush = executor.try_adopt_task_instances([mock_ti])
assert tis_to_flush == [mock_ti]
@@ -880,12 +906,17 @@ class TestKubernetesExecutor:
assert tis_to_flush_by_key == {ti_key: {}}
assert executor.running == set()
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
- def test_adopt_completed_pods(self, mock_kube_client):
+ def test_adopt_completed_pods(self, mock_kube_client,
mock_kube_dynamic_client):
"""We should adopt all completed pods from other schedulers"""
executor = self.kubernetes_executor
executor.scheduler_job_id = "modified"
executor.kube_client = mock_kube_client
+ mock_kube_dynamic_client.return_value = mock.MagicMock()
+ mock_pod_resource = mock.MagicMock()
+ mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
+ mock_kube_dynamic_client.return_value.get.return_value.items = []
executor.kube_config.kube_namespace = "somens"
pod_names = ["one", "two"]
@@ -897,7 +928,7 @@ class TestKubernetesExecutor:
"try_number": "1",
}
- mock_kube_client.list_namespaced_pod.return_value.items = [
+ mock_kube_dynamic_client.return_value.get.return_value.items = [
k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
name=pod_name,
@@ -911,10 +942,12 @@ class TestKubernetesExecutor:
expected_running_ti_keys =
{annotations_to_key(get_annotations(pod_name)) for pod_name in pod_names}
executor._adopt_completed_pods(mock_kube_client)
- mock_kube_client.list_namespaced_pod.assert_called_once_with(
+ mock_kube_dynamic_client.return_value.get.assert_called_once_with(
+ resource=mock_pod_resource,
namespace="somens",
field_selector="status.phase=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker!=modified,airflow_executor_done!=True",
+ header_params={"Accept":
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)
assert len(pod_names) ==
mock_kube_client.patch_namespaced_pod.call_count
mock_kube_client.patch_namespaced_pod.assert_has_calls(
@@ -1002,10 +1035,16 @@ class TestKubernetesExecutor:
assert executor.kube_config.multi_namespace_mode_namespace_list ==
expected_value_in_kube_config
@pytest.mark.db_test
- def test_clear_not_launched_queued_tasks_not_launched(self, dag_maker,
create_dummy_dag, session):
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+ def test_clear_not_launched_queued_tasks_not_launched(
+ self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
+ ):
"""If a pod isn't found for a TI, reset the state to scheduled"""
mock_kube_client = mock.MagicMock()
- mock_kube_client.list_namespaced_pod.return_value =
k8s.V1PodList(items=[])
+ mock_kube_dynamic_client.return_value = mock.MagicMock()
+ mock_pod_resource = mock.MagicMock()
+ mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
+ mock_kube_dynamic_client.return_value.get.return_value.items = []
create_dummy_dag(dag_id="test_clear", task_id="task1",
with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()
@@ -1022,9 +1061,12 @@ class TestKubernetesExecutor:
ti.refresh_from_db()
assert ti.state == State.SCHEDULED
- assert mock_kube_client.list_namespaced_pod.call_count == 1
- mock_kube_client.list_namespaced_pod.assert_called_with(
- namespace="default", label_selector="airflow-worker=1"
+ assert mock_kube_dynamic_client.return_value.get.call_count == 1
+ mock_kube_dynamic_client.return_value.get.assert_called_with(
+ resource=mock_pod_resource,
+ namespace="default",
+ label_selector="airflow-worker=1",
+ header_params={"Accept":
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)
@pytest.mark.db_test
@@ -1036,12 +1078,16 @@ class TestKubernetesExecutor:
pytest.param("kubernetes", "kubernetes"),
],
)
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
def test_clear_not_launched_queued_tasks_launched(
- self, dag_maker, create_dummy_dag, session, task_queue,
kubernetes_queue
+ self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session,
task_queue, kubernetes_queue
):
"""Leave the state alone if a pod already exists"""
mock_kube_client = mock.MagicMock()
- mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(
+ mock_kube_dynamic_client.return_value = mock.MagicMock()
+ mock_pod_resource = mock.MagicMock()
+ mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
+ mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList(
items=[
k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
@@ -1075,15 +1121,19 @@ class TestKubernetesExecutor:
ti.refresh_from_db()
assert ti.state == State.QUEUED
- mock_kube_client.list_namespaced_pod.assert_called_once_with(
- namespace="default", label_selector="airflow-worker=1"
+ mock_kube_dynamic_client.return_value.get.assert_called_once_with(
+ resource=mock_pod_resource,
+ namespace="default",
+ label_selector="airflow-worker=1",
+ header_params={"Accept":
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)
@pytest.mark.db_test
- def test_clear_not_launched_queued_tasks_mapped_task(self, dag_maker,
session):
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+ def test_clear_not_launched_queued_tasks_mapped_task(self,
mock_kube_dynamic_client, dag_maker, session):
"""One mapped task has a launched pod - other does not."""
- def list_namespaced_pod(*args, **kwargs):
+ def get(*args, **kwargs):
return k8s.V1PodList(
items=[
k8s.V1Pod(
@@ -1103,7 +1153,10 @@ class TestKubernetesExecutor:
)
mock_kube_client = mock.MagicMock()
- mock_kube_client.list_namespaced_pod.side_effect = list_namespaced_pod
+ mock_kube_dynamic_client.return_value = mock.MagicMock()
+ mock_pod_resource = mock.MagicMock()
+ mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
+ mock_kube_dynamic_client.return_value.get.side_effect = get
with dag_maker(dag_id="test_clear"):
op =
BashOperator.partial(task_id="bash").expand(bash_command=["echo 0", "echo 1"])
@@ -1129,10 +1182,12 @@ class TestKubernetesExecutor:
assert ti0.state == State.QUEUED
assert ti1.state == State.SCHEDULED
- assert mock_kube_client.list_namespaced_pod.call_count == 1
- mock_kube_client.list_namespaced_pod.assert_called_with(
+ assert mock_kube_dynamic_client.return_value.get.call_count == 1
+ mock_kube_dynamic_client.return_value.get.assert_called_with(
+ resource=mock_pod_resource,
namespace="default",
label_selector="airflow-worker=1",
+ header_params={"Accept":
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)
@pytest.mark.db_test
@@ -1163,10 +1218,14 @@ class TestKubernetesExecutor:
assert mock_kube_client.list_namespaced_pod.call_count == 0
@pytest.mark.db_test
- def test_clear_not_launched_queued_tasks_clear_only_by_job_id(self,
dag_maker, create_dummy_dag, session):
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+ def test_clear_not_launched_queued_tasks_clear_only_by_job_id(
+ self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
+ ):
"""clear only not launched queued tasks which are queued by the same
executor job"""
mock_kube_client = mock.MagicMock()
- mock_kube_client.list_namespaced_pod.return_value =
k8s.V1PodList(items=[])
+ mock_kube_dynamic_client.return_value = mock.MagicMock()
+ mock_kube_dynamic_client.return_value.get.return_value =
k8s.V1PodList(items=[])
create_dummy_dag(dag_id="test_clear_0", task_id="task0",
with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()