This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 98d52af707 Use map_index when clearing not launched tasks in k8s 
(#23224)
98d52af707 is described below

commit 98d52af7074e9a82457515588bdf9cdd6de70f35
Author: Tanel Kiis <[email protected]>
AuthorDate: Mon Apr 25 22:35:17 2022 +0300

    Use map_index when clearing not launched tasks in k8s (#23224)
---
 airflow/executors/kubernetes_executor.py    |  1 +
 tests/executors/test_kubernetes_executor.py | 57 +++++++++++++++++++++++++++++
 2 files changed, 58 insertions(+)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index bdac0889f5..19a04216f2 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -505,6 +505,7 @@ class KubernetesExecutor(BaseExecutor):
                 TaskInstance.dag_id == ti.dag_id,
                 TaskInstance.task_id == ti.task_id,
                 TaskInstance.run_id == ti.run_id,
+                TaskInstance.map_index == ti.map_index,
             ).update({TaskInstance.state: State.SCHEDULED})
 
     def start(self) -> None:
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index af79b1663b..018a94c4d7 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -30,6 +30,7 @@ from urllib3 import HTTPResponse
 
 from airflow import AirflowException
 from airflow.models.taskinstance import TaskInstanceKey
+from airflow.operators.bash import BashOperator
 from airflow.utils import timezone
 from tests.test_utils.config import conf_vars
 
@@ -743,6 +744,62 @@ class TestKubernetesExecutor:
             "default", 
label_selector="dag_id=test_clear,task_id=task1,airflow-worker=1,run_id=test"
         )
 
+    def test_clear_not_launched_queued_tasks_mapped_task(self, dag_maker, 
session):
+        """One mapped task has a launched pod - other does not."""
+
+        def list_namespaced_pod(*args, **kwargs):
+            if 'map_index=0' in kwargs['label_selector']:
+                return k8s.V1PodList(items=["something"])
+            else:
+                return k8s.V1PodList(items=[])
+
+        mock_kube_client = mock.MagicMock()
+        mock_kube_client.list_namespaced_pod.side_effect = list_namespaced_pod
+
+        with dag_maker(dag_id='test_clear'):
+            op = 
BashOperator.partial(task_id="bash").expand(bash_command=["echo 0", "echo 1"])
+
+        dag_run = dag_maker.create_dagrun()
+        ti0 = dag_run.get_task_instance(op.task_id, session, map_index=0)
+        ti0.state = State.QUEUED
+        ti0.queued_by_job_id = 1
+
+        ti1 = dag_run.get_task_instance(op.task_id, session, map_index=1)
+        ti1.state = State.QUEUED
+        ti1.queued_by_job_id = 1
+
+        session.flush()
+
+        executor = self.kubernetes_executor
+        executor.kube_client = mock_kube_client
+        executor.clear_not_launched_queued_tasks(session=session)
+
+        ti0.refresh_from_db()
+        ti1.refresh_from_db()
+        assert ti0.state == State.QUEUED
+        assert ti1.state == State.SCHEDULED
+
+        assert mock_kube_client.list_namespaced_pod.call_count == 3
+        execution_date_label = 
pod_generator.datetime_to_label_safe_datestring(dag_run.execution_date)
+        mock_kube_client.list_namespaced_pod.assert_has_calls(
+            [
+                mock.call(
+                    "default",
+                    
label_selector="dag_id=test_clear,task_id=bash,airflow-worker=1,map_index=0,run_id=test",
+                ),
+                mock.call(
+                    "default",
+                    
label_selector="dag_id=test_clear,task_id=bash,airflow-worker=1,map_index=1,run_id=test",
+                ),
+                mock.call(
+                    "default",
+                    
label_selector=f"dag_id=test_clear,task_id=bash,airflow-worker=1,map_index=1,"
+                    f"execution_date={execution_date_label}",
+                ),
+            ],
+            any_order=True,
+        )
+
 
 class TestKubernetesJobWatcher(unittest.TestCase):
     def setUp(self):

Reply via email to