This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e5a474bdd1 kubernetes executor cleanup_stuck_queued_tasks optimization
(#41220)
e5a474bdd1 is described below
commit e5a474bdd173263b628d9a8a2efa3860cb88a1c8
Author: Gopal Dirisala <[email protected]>
AuthorDate: Mon Oct 7 20:52:41 2024 +0530
kubernetes executor cleanup_stuck_queued_tasks optimization (#41220)
* kubernetes executor cleanup_stuck_queued_tasks optimization
* kubernetes executor cleanup_stuck_queued_tasks optimization
* kubernetes executor cleanup_stuck_queued_tasks optimization
* kubernetes executor cleanup_stuck_queued_tasks optimization
* Updated comment
* Provider change log and version updated
* Update the worker pod and task comparison from labels to annotations
---
airflow/providers/cncf/kubernetes/CHANGELOG.rst | 6 ++
.../kubernetes/executors/kubernetes_executor.py | 111 +++++++++------------
airflow/providers/cncf/kubernetes/provider.yaml | 1 +
.../executors/test_kubernetes_executor.py | 67 ++++++++++---
4 files changed, 107 insertions(+), 78 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index 1614786802..3a8101943e 100644
--- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -27,6 +27,12 @@
Changelog
---------
+main
+.....
+
+.. warning::
+ Support for identifying pods by execution_date during the upgrade from
Airflow 1 to 2 has been removed. This may result in duplicate pods being
launched for tasks originally started by Airflow 1, but only one of the task
pods will succeed.
+
8.4.2
.....
diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 0b2de8085c..7c6e0d8852 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -179,6 +179,36 @@ class KubernetesExecutor(BaseExecutor):
return pod_generator.datetime_to_label_safe_datestring(input_value)
return pod_generator.make_safe_label_value(input_value)
+ def get_pod_combined_search_str_to_pod_map(self) -> dict[str, k8s.V1Pod]:
+ """
+ List the worker pods owned by this scheduler and create a map
containing pod combined search str -> pod.
+
+ For every pod, it creates two below entries in the map
+
dag_id={dag_id},task_id={task_id},airflow-worker={airflow_worker},<map_index={map_index}>,run_id={run_id}
+ """
+ # airflow worker label selector batch call
+ kwargs = {"label_selector":
f"airflow-worker={self._make_safe_label_value(str(self.job_id))}"}
+ if self.kube_config.kube_client_request_args:
+ kwargs.update(self.kube_config.kube_client_request_args)
+ pod_list = self._list_pods(kwargs)
+
+ # create a set against pod query label fields
+ pod_combined_search_str_to_pod_map = {}
+ for pod in pod_list:
+ dag_id = pod.metadata.annotations.get("dag_id", None)
+ task_id = pod.metadata.annotations.get("task_id", None)
+ map_index = pod.metadata.annotations.get("map_index", None)
+ run_id = pod.metadata.annotations.get("run_id", None)
+ if dag_id is None or task_id is None:
+ continue
+ search_base_str = f"dag_id={dag_id},task_id={task_id}"
+ if map_index is not None:
+ search_base_str += f",map_index={map_index}"
+ if run_id is not None:
+ search_str = f"{search_base_str},run_id={run_id}"
+ pod_combined_search_str_to_pod_map[search_str] = pod
+ return pod_combined_search_str_to_pod_map
+
@provide_session
def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION)
-> None:
"""
@@ -218,32 +248,7 @@ class KubernetesExecutor(BaseExecutor):
if not queued_tis:
return
- # airflow worker label selector batch call
- kwargs = {"label_selector":
f"airflow-worker={self._make_safe_label_value(str(self.job_id))}"}
- if self.kube_config.kube_client_request_args:
- kwargs.update(self.kube_config.kube_client_request_args)
- pod_list = self._list_pods(kwargs)
-
- # create a set against pod query label fields
- label_search_set = set()
- for pod in pod_list:
- dag_id = pod.metadata.labels.get("dag_id", None)
- task_id = pod.metadata.labels.get("task_id", None)
- airflow_worker = pod.metadata.labels.get("airflow-worker",
None)
- map_index = pod.metadata.labels.get("map_index", None)
- run_id = pod.metadata.labels.get("run_id", None)
- execution_date = pod.metadata.labels.get("execution_date",
None)
- if dag_id is None or task_id is None or airflow_worker is None:
- continue
- label_search_base_str =
f"dag_id={dag_id},task_id={task_id},airflow-worker={airflow_worker}"
- if map_index is not None:
- label_search_base_str += f",map_index={map_index}"
- if run_id is not None:
- label_search_str =
f"{label_search_base_str},run_id={run_id}"
- label_search_set.add(label_search_str)
- if execution_date is not None:
- label_search_str =
f"{label_search_base_str},execution_date={execution_date}"
- label_search_set.add(label_search_str)
+ pod_combined_search_str_to_pod_map =
self.get_pod_combined_search_str_to_pod_map()
for ti in queued_tis:
self.log.debug("Checking task instance %s", ti)
@@ -253,24 +258,13 @@ class KubernetesExecutor(BaseExecutor):
continue
# Build the pod selector
- base_label_selector = (
- f"dag_id={self._make_safe_label_value(ti.dag_id)},"
- f"task_id={self._make_safe_label_value(ti.task_id)},"
-
f"airflow-worker={self._make_safe_label_value(str(ti.queued_by_job_id))}"
- )
+ base_selector = f"dag_id={ti.dag_id},task_id={ti.task_id}"
if ti.map_index >= 0:
# Old tasks _couldn't_ be mapped, so we don't have to
worry about compat
- base_label_selector += f",map_index={ti.map_index}"
+ base_selector += f",map_index={ti.map_index}"
- # Try run_id first
- label_search_str =
f"{base_label_selector},run_id={self._make_safe_label_value(ti.run_id)}"
- if label_search_str in label_search_set:
- continue
- # Fallback to old style of using execution_date
- label_search_str = (
-
f"{base_label_selector},execution_date={self._make_safe_label_value(ti.execution_date)}"
- )
- if label_search_str in label_search_set:
+ search_str = f"{base_selector},run_id={ti.run_id}"
+ if search_str in pod_combined_search_str_to_pod_map:
continue
self.log.info("TaskInstance: %s found in queued state but was
not launched, rescheduling", ti)
session.execute(
@@ -603,34 +597,27 @@ class KubernetesExecutor(BaseExecutor):
:param tis: List of Task Instances to clean up
:return: List of readable task instances for a warning message
"""
- from airflow.providers.cncf.kubernetes.pod_generator import
PodGenerator
-
if TYPE_CHECKING:
assert self.kube_client
assert self.kube_scheduler
- readable_tis = []
+ readable_tis: list[str] = []
+ if not tis:
+ return readable_tis
+ pod_combined_search_str_to_pod_map =
self.get_pod_combined_search_str_to_pod_map()
for ti in tis:
- selector = PodGenerator.build_selector_for_k8s_executor_pod(
- dag_id=ti.dag_id,
- task_id=ti.task_id,
- try_number=ti.try_number,
- map_index=ti.map_index,
- run_id=ti.run_id,
- airflow_worker=ti.queued_by_job_id,
- )
- namespace = self._get_pod_namespace(ti)
- pod_list = self.kube_client.list_namespaced_pod(
- namespace=namespace,
- label_selector=selector,
- ).items
- if not pod_list:
+ # Build the pod selector
+ base_label_selector = f"dag_id={ti.dag_id},task_id={ti.task_id}"
+ if ti.map_index >= 0:
+ # Old tasks _couldn't_ be mapped, so we don't have to worry
about compat
+ base_label_selector += f",map_index={ti.map_index}"
+
+ search_str = f"{base_label_selector},run_id={ti.run_id}"
+ pod = pod_combined_search_str_to_pod_map.get(search_str, None)
+ if not pod:
self.log.warning("Cannot find pod for ti %s", ti)
continue
- elif len(pod_list) > 1:
- self.log.warning("Found multiple pods for ti %s: %s", ti,
pod_list)
- continue
readable_tis.append(repr(ti))
- self.kube_scheduler.delete_pod(pod_name=pod_list[0].metadata.name,
namespace=namespace)
+ self.kube_scheduler.delete_pod(pod_name=pod.metadata.name,
namespace=pod.metadata.namespace)
return readable_tis
def adopt_launched_task(
diff --git a/airflow/providers/cncf/kubernetes/provider.yaml
b/airflow/providers/cncf/kubernetes/provider.yaml
index 570cef67f8..0849609b8f 100644
--- a/airflow/providers/cncf/kubernetes/provider.yaml
+++ b/airflow/providers/cncf/kubernetes/provider.yaml
@@ -25,6 +25,7 @@ state: ready
source-date-epoch: 1726860352
# note that those versions are maintained by release manager - do not update
them manually
versions:
+ - 9.0.0
- 8.4.2
- 8.4.1
- 8.4.0
diff --git
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index 8cc46c3dba..4622d31b57 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -19,7 +19,7 @@ from __future__ import annotations
import random
import re
import string
-from datetime import datetime, timedelta
+from datetime import datetime
from unittest import mock
import pytest
@@ -1191,28 +1191,52 @@ class TestKubernetesExecutor:
assert tis_to_flush_by_key == {"foobar": {}}
@pytest.mark.db_test
-
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
- @mock.patch(
-
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.delete_pod"
- )
- def test_cleanup_stuck_queued_tasks(self, mock_delete_pod,
mock_kube_client, dag_maker, session):
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+ def test_cleanup_stuck_queued_tasks(self, mock_kube_dynamic_client,
dag_maker, create_dummy_dag, session):
"""Delete any pods associated with a task stuck in queued."""
- executor = KubernetesExecutor()
- executor.start()
- executor.scheduler_job_id = "123"
- with dag_maker(dag_id="test_cleanup_stuck_queued_tasks"):
- op = BashOperator(task_id="bash", bash_command=["echo 0", "echo
1"])
+ mock_kube_client = mock.MagicMock()
+ mock_kube_dynamic_client.return_value = mock.MagicMock()
+ mock_pod_resource = mock.MagicMock()
+ mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
+ mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList(
+ items=[
+ k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(
+ annotations={
+ "dag_id": "test_cleanup_stuck_queued_tasks",
+ "task_id": "bash",
+ "run_id": "test",
+ "try_number": 0,
+ },
+ labels={
+ "role": "airflow-worker",
+ "dag_id": "test_cleanup_stuck_queued_tasks",
+ "task_id": "bash",
+ "airflow-worker": 123,
+ "run_id": "test",
+ "try_number": 0,
+ },
+ ),
+ status=k8s.V1PodStatus(phase="Pending"),
+ )
+ ]
+ )
+ create_dummy_dag(dag_id="test_cleanup_stuck_queued_tasks",
task_id="bash", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()
- ti = dag_run.get_task_instance(op.task_id, session)
- ti.retries = 1
+ ti = dag_run.task_instances[0]
ti.state = State.QUEUED
- ti.queued_dttm = timezone.utcnow() - timedelta(minutes=30)
+ ti.queued_by_job_id = 123
+ session.flush()
+
+ executor = self.kubernetes_executor
+ executor.job_id = 123
+ executor.kube_client = mock_kube_client
+ executor.kube_scheduler = mock.MagicMock()
ti.refresh_from_db()
tis = [ti]
executor.cleanup_stuck_queued_tasks(tis)
- mock_delete_pod.assert_called_once()
+ executor.kube_scheduler.delete_pod.assert_called_once()
assert executor.running == set()
- executor.end()
@pytest.mark.parametrize(
"raw_multi_namespace_mode, raw_value_namespace_list,
expected_value_in_kube_config",
@@ -1292,6 +1316,11 @@ class TestKubernetesExecutor:
items=[
k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
+ annotations={
+ "dag_id": "test_clear",
+ "task_id": "task1",
+ "run_id": "test",
+ },
labels={
"role": "airflow-worker",
"dag_id": "test_clear",
@@ -1339,6 +1368,12 @@ class TestKubernetesExecutor:
items=[
k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
+ annotations={
+ "dag_id": "test_clear",
+ "task_id": "bash",
+ "run_id": "test",
+ "map_index": 0,
+ },
labels={
"role": "airflow-worker",
"dag_id": "test_clear",