This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 98d52af707 Use map_index when clearing not launched tasks in k8s
(#23224)
98d52af707 is described below
commit 98d52af7074e9a82457515588bdf9cdd6de70f35
Author: Tanel Kiis <[email protected]>
AuthorDate: Mon Apr 25 22:35:17 2022 +0300
Use map_index when clearing not launched tasks in k8s (#23224)
---
airflow/executors/kubernetes_executor.py | 1 +
tests/executors/test_kubernetes_executor.py | 57 +++++++++++++++++++++++++++++
2 files changed, 58 insertions(+)
diff --git a/airflow/executors/kubernetes_executor.py
b/airflow/executors/kubernetes_executor.py
index bdac0889f5..19a04216f2 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -505,6 +505,7 @@ class KubernetesExecutor(BaseExecutor):
TaskInstance.dag_id == ti.dag_id,
TaskInstance.task_id == ti.task_id,
TaskInstance.run_id == ti.run_id,
+ TaskInstance.map_index == ti.map_index,
).update({TaskInstance.state: State.SCHEDULED})
def start(self) -> None:
diff --git a/tests/executors/test_kubernetes_executor.py
b/tests/executors/test_kubernetes_executor.py
index af79b1663b..018a94c4d7 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -30,6 +30,7 @@ from urllib3 import HTTPResponse
from airflow import AirflowException
from airflow.models.taskinstance import TaskInstanceKey
+from airflow.operators.bash import BashOperator
from airflow.utils import timezone
from tests.test_utils.config import conf_vars
@@ -743,6 +744,62 @@ class TestKubernetesExecutor:
"default",
label_selector="dag_id=test_clear,task_id=task1,airflow-worker=1,run_id=test"
)
+ def test_clear_not_launched_queued_tasks_mapped_task(self, dag_maker,
session):
+ """One mapped task has a launched pod - other does not."""
+
+ def list_namespaced_pod(*args, **kwargs):
+ if 'map_index=0' in kwargs['label_selector']:
+ return k8s.V1PodList(items=["something"])
+ else:
+ return k8s.V1PodList(items=[])
+
+ mock_kube_client = mock.MagicMock()
+ mock_kube_client.list_namespaced_pod.side_effect = list_namespaced_pod
+
+ with dag_maker(dag_id='test_clear'):
+ op =
BashOperator.partial(task_id="bash").expand(bash_command=["echo 0", "echo 1"])
+
+ dag_run = dag_maker.create_dagrun()
+ ti0 = dag_run.get_task_instance(op.task_id, session, map_index=0)
+ ti0.state = State.QUEUED
+ ti0.queued_by_job_id = 1
+
+ ti1 = dag_run.get_task_instance(op.task_id, session, map_index=1)
+ ti1.state = State.QUEUED
+ ti1.queued_by_job_id = 1
+
+ session.flush()
+
+ executor = self.kubernetes_executor
+ executor.kube_client = mock_kube_client
+ executor.clear_not_launched_queued_tasks(session=session)
+
+ ti0.refresh_from_db()
+ ti1.refresh_from_db()
+ assert ti0.state == State.QUEUED
+ assert ti1.state == State.SCHEDULED
+
+ assert mock_kube_client.list_namespaced_pod.call_count == 3
+ execution_date_label =
pod_generator.datetime_to_label_safe_datestring(dag_run.execution_date)
+ mock_kube_client.list_namespaced_pod.assert_has_calls(
+ [
+ mock.call(
+ "default",
+
label_selector="dag_id=test_clear,task_id=bash,airflow-worker=1,map_index=0,run_id=test",
+ ),
+ mock.call(
+ "default",
+
label_selector="dag_id=test_clear,task_id=bash,airflow-worker=1,map_index=1,run_id=test",
+ ),
+ mock.call(
+ "default",
+
label_selector=f"dag_id=test_clear,task_id=bash,airflow-worker=1,map_index=1,"
+ f"execution_date={execution_date_label}",
+ ),
+ ],
+ any_order=True,
+ )
+
class TestKubernetesJobWatcher(unittest.TestCase):
def setUp(self):