This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e5a474bdd1 kubernetes executor cleanup_stuck_queued_tasks optimization 
(#41220)
e5a474bdd1 is described below

commit e5a474bdd173263b628d9a8a2efa3860cb88a1c8
Author: Gopal Dirisala <[email protected]>
AuthorDate: Mon Oct 7 20:52:41 2024 +0530

    kubernetes executor cleanup_stuck_queued_tasks optimization (#41220)
    
    * kubernetes executor cleanup_stuck_queued_tasks optimization
    
    * kubernetes executor cleanup_stuck_queued_tasks optimization
    
    * kubernetes executor cleanup_stuck_queued_tasks optimization
    
    * kubernetes executor cleanup_stuck_queued_tasks optimization
    
    * Updated comment
    
    * Provider change log and version updated
    
    * Update the worker pod and task comparison from labels to annotations
---
 airflow/providers/cncf/kubernetes/CHANGELOG.rst    |   6 ++
 .../kubernetes/executors/kubernetes_executor.py    | 111 +++++++++------------
 airflow/providers/cncf/kubernetes/provider.yaml    |   1 +
 .../executors/test_kubernetes_executor.py          |  67 ++++++++++---
 4 files changed, 107 insertions(+), 78 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst 
b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index 1614786802..3a8101943e 100644
--- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -27,6 +27,12 @@
 Changelog
 ---------
 
+main
+.....
+
+.. warning::
+  Support for identifying pods by execution_date during the upgrade from 
Airflow 1 to 2 has been removed. This may result in duplicate pods being 
launched for tasks originally started by Airflow 1, but only one of the task 
pods will succeed.
+
 8.4.2
 .....
 
diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 0b2de8085c..7c6e0d8852 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -179,6 +179,36 @@ class KubernetesExecutor(BaseExecutor):
             return pod_generator.datetime_to_label_safe_datestring(input_value)
         return pod_generator.make_safe_label_value(input_value)
 
+    def get_pod_combined_search_str_to_pod_map(self) -> dict[str, k8s.V1Pod]:
+        """
+        List the worker pods owned by this scheduler and create a map 
containing pod combined search str -> pod.
+
+        For every pod, it creates two below entries in the map
+        
dag_id={dag_id},task_id={task_id},airflow-worker={airflow_worker},<map_index={map_index}>,run_id={run_id}
+        """
+        # airflow worker label selector batch call
+        kwargs = {"label_selector": 
f"airflow-worker={self._make_safe_label_value(str(self.job_id))}"}
+        if self.kube_config.kube_client_request_args:
+            kwargs.update(self.kube_config.kube_client_request_args)
+        pod_list = self._list_pods(kwargs)
+
+        # create a set against pod query label fields
+        pod_combined_search_str_to_pod_map = {}
+        for pod in pod_list:
+            dag_id = pod.metadata.annotations.get("dag_id", None)
+            task_id = pod.metadata.annotations.get("task_id", None)
+            map_index = pod.metadata.annotations.get("map_index", None)
+            run_id = pod.metadata.annotations.get("run_id", None)
+            if dag_id is None or task_id is None:
+                continue
+            search_base_str = f"dag_id={dag_id},task_id={task_id}"
+            if map_index is not None:
+                search_base_str += f",map_index={map_index}"
+            if run_id is not None:
+                search_str = f"{search_base_str},run_id={run_id}"
+                pod_combined_search_str_to_pod_map[search_str] = pod
+        return pod_combined_search_str_to_pod_map
+
     @provide_session
     def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) 
-> None:
         """
@@ -218,32 +248,7 @@ class KubernetesExecutor(BaseExecutor):
             if not queued_tis:
                 return
 
-            # airflow worker label selector batch call
-            kwargs = {"label_selector": 
f"airflow-worker={self._make_safe_label_value(str(self.job_id))}"}
-            if self.kube_config.kube_client_request_args:
-                kwargs.update(self.kube_config.kube_client_request_args)
-            pod_list = self._list_pods(kwargs)
-
-            # create a set against pod query label fields
-            label_search_set = set()
-            for pod in pod_list:
-                dag_id = pod.metadata.labels.get("dag_id", None)
-                task_id = pod.metadata.labels.get("task_id", None)
-                airflow_worker = pod.metadata.labels.get("airflow-worker", 
None)
-                map_index = pod.metadata.labels.get("map_index", None)
-                run_id = pod.metadata.labels.get("run_id", None)
-                execution_date = pod.metadata.labels.get("execution_date", 
None)
-                if dag_id is None or task_id is None or airflow_worker is None:
-                    continue
-                label_search_base_str = 
f"dag_id={dag_id},task_id={task_id},airflow-worker={airflow_worker}"
-                if map_index is not None:
-                    label_search_base_str += f",map_index={map_index}"
-                if run_id is not None:
-                    label_search_str = 
f"{label_search_base_str},run_id={run_id}"
-                    label_search_set.add(label_search_str)
-                if execution_date is not None:
-                    label_search_str = 
f"{label_search_base_str},execution_date={execution_date}"
-                    label_search_set.add(label_search_str)
+            pod_combined_search_str_to_pod_map = 
self.get_pod_combined_search_str_to_pod_map()
 
             for ti in queued_tis:
                 self.log.debug("Checking task instance %s", ti)
@@ -253,24 +258,13 @@ class KubernetesExecutor(BaseExecutor):
                     continue
 
                 # Build the pod selector
-                base_label_selector = (
-                    f"dag_id={self._make_safe_label_value(ti.dag_id)},"
-                    f"task_id={self._make_safe_label_value(ti.task_id)},"
-                    
f"airflow-worker={self._make_safe_label_value(str(ti.queued_by_job_id))}"
-                )
+                base_selector = f"dag_id={ti.dag_id},task_id={ti.task_id}"
                 if ti.map_index >= 0:
                     # Old tasks _couldn't_ be mapped, so we don't have to 
worry about compat
-                    base_label_selector += f",map_index={ti.map_index}"
+                    base_selector += f",map_index={ti.map_index}"
 
-                # Try run_id first
-                label_search_str = 
f"{base_label_selector},run_id={self._make_safe_label_value(ti.run_id)}"
-                if label_search_str in label_search_set:
-                    continue
-                # Fallback to old style of using execution_date
-                label_search_str = (
-                    
f"{base_label_selector},execution_date={self._make_safe_label_value(ti.execution_date)}"
-                )
-                if label_search_str in label_search_set:
+                search_str = f"{base_selector},run_id={ti.run_id}"
+                if search_str in pod_combined_search_str_to_pod_map:
                     continue
                 self.log.info("TaskInstance: %s found in queued state but was 
not launched, rescheduling", ti)
                 session.execute(
@@ -603,34 +597,27 @@ class KubernetesExecutor(BaseExecutor):
         :param tis: List of Task Instances to clean up
         :return: List of readable task instances for a warning message
         """
-        from airflow.providers.cncf.kubernetes.pod_generator import 
PodGenerator
-
         if TYPE_CHECKING:
             assert self.kube_client
             assert self.kube_scheduler
-        readable_tis = []
+        readable_tis: list[str] = []
+        if not tis:
+            return readable_tis
+        pod_combined_search_str_to_pod_map = 
self.get_pod_combined_search_str_to_pod_map()
         for ti in tis:
-            selector = PodGenerator.build_selector_for_k8s_executor_pod(
-                dag_id=ti.dag_id,
-                task_id=ti.task_id,
-                try_number=ti.try_number,
-                map_index=ti.map_index,
-                run_id=ti.run_id,
-                airflow_worker=ti.queued_by_job_id,
-            )
-            namespace = self._get_pod_namespace(ti)
-            pod_list = self.kube_client.list_namespaced_pod(
-                namespace=namespace,
-                label_selector=selector,
-            ).items
-            if not pod_list:
+            # Build the pod selector
+            base_label_selector = f"dag_id={ti.dag_id},task_id={ti.task_id}"
+            if ti.map_index >= 0:
+                # Old tasks _couldn't_ be mapped, so we don't have to worry 
about compat
+                base_label_selector += f",map_index={ti.map_index}"
+
+            search_str = f"{base_label_selector},run_id={ti.run_id}"
+            pod = pod_combined_search_str_to_pod_map.get(search_str, None)
+            if not pod:
                 self.log.warning("Cannot find pod for ti %s", ti)
                 continue
-            elif len(pod_list) > 1:
-                self.log.warning("Found multiple pods for ti %s: %s", ti, 
pod_list)
-                continue
             readable_tis.append(repr(ti))
-            self.kube_scheduler.delete_pod(pod_name=pod_list[0].metadata.name, 
namespace=namespace)
+            self.kube_scheduler.delete_pod(pod_name=pod.metadata.name, 
namespace=pod.metadata.namespace)
         return readable_tis
 
     def adopt_launched_task(
diff --git a/airflow/providers/cncf/kubernetes/provider.yaml 
b/airflow/providers/cncf/kubernetes/provider.yaml
index 570cef67f8..0849609b8f 100644
--- a/airflow/providers/cncf/kubernetes/provider.yaml
+++ b/airflow/providers/cncf/kubernetes/provider.yaml
@@ -25,6 +25,7 @@ state: ready
 source-date-epoch: 1726860352
 # note that those versions are maintained by release manager - do not update 
them manually
 versions:
+  - 9.0.0
   - 8.4.2
   - 8.4.1
   - 8.4.0
diff --git 
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py 
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index 8cc46c3dba..4622d31b57 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 import random
 import re
 import string
-from datetime import datetime, timedelta
+from datetime import datetime
 from unittest import mock
 
 import pytest
@@ -1191,28 +1191,52 @@ class TestKubernetesExecutor:
         assert tis_to_flush_by_key == {"foobar": {}}
 
     @pytest.mark.db_test
-    
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
-    @mock.patch(
-        
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.delete_pod"
-    )
-    def test_cleanup_stuck_queued_tasks(self, mock_delete_pod, 
mock_kube_client, dag_maker, session):
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+    def test_cleanup_stuck_queued_tasks(self, mock_kube_dynamic_client, 
dag_maker, create_dummy_dag, session):
         """Delete any pods associated with a task stuck in queued."""
-        executor = KubernetesExecutor()
-        executor.start()
-        executor.scheduler_job_id = "123"
-        with dag_maker(dag_id="test_cleanup_stuck_queued_tasks"):
-            op = BashOperator(task_id="bash", bash_command=["echo 0", "echo 
1"])
+        mock_kube_client = mock.MagicMock()
+        mock_kube_dynamic_client.return_value = mock.MagicMock()
+        mock_pod_resource = mock.MagicMock()
+        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
+        mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList(
+            items=[
+                k8s.V1Pod(
+                    metadata=k8s.V1ObjectMeta(
+                        annotations={
+                            "dag_id": "test_cleanup_stuck_queued_tasks",
+                            "task_id": "bash",
+                            "run_id": "test",
+                            "try_number": 0,
+                        },
+                        labels={
+                            "role": "airflow-worker",
+                            "dag_id": "test_cleanup_stuck_queued_tasks",
+                            "task_id": "bash",
+                            "airflow-worker": 123,
+                            "run_id": "test",
+                            "try_number": 0,
+                        },
+                    ),
+                    status=k8s.V1PodStatus(phase="Pending"),
+                )
+            ]
+        )
+        create_dummy_dag(dag_id="test_cleanup_stuck_queued_tasks", 
task_id="bash", with_dagrun_type=None)
         dag_run = dag_maker.create_dagrun()
-        ti = dag_run.get_task_instance(op.task_id, session)
-        ti.retries = 1
+        ti = dag_run.task_instances[0]
         ti.state = State.QUEUED
-        ti.queued_dttm = timezone.utcnow() - timedelta(minutes=30)
+        ti.queued_by_job_id = 123
+        session.flush()
+
+        executor = self.kubernetes_executor
+        executor.job_id = 123
+        executor.kube_client = mock_kube_client
+        executor.kube_scheduler = mock.MagicMock()
         ti.refresh_from_db()
         tis = [ti]
         executor.cleanup_stuck_queued_tasks(tis)
-        mock_delete_pod.assert_called_once()
+        executor.kube_scheduler.delete_pod.assert_called_once()
         assert executor.running == set()
-        executor.end()
 
     @pytest.mark.parametrize(
         "raw_multi_namespace_mode, raw_value_namespace_list, 
expected_value_in_kube_config",
@@ -1292,6 +1316,11 @@ class TestKubernetesExecutor:
             items=[
                 k8s.V1Pod(
                     metadata=k8s.V1ObjectMeta(
+                        annotations={
+                            "dag_id": "test_clear",
+                            "task_id": "task1",
+                            "run_id": "test",
+                        },
                         labels={
                             "role": "airflow-worker",
                             "dag_id": "test_clear",
@@ -1339,6 +1368,12 @@ class TestKubernetesExecutor:
                 items=[
                     k8s.V1Pod(
                         metadata=k8s.V1ObjectMeta(
+                            annotations={
+                                "dag_id": "test_clear",
+                                "task_id": "bash",
+                                "run_id": "test",
+                                "map_index": 0,
+                            },
                             labels={
                                 "role": "airflow-worker",
                                 "dag_id": "test_clear",

Reply via email to