This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f871e015ce9 Allow internal retries when pending k8s pod is deleted
(#45184)
f871e015ce9 is described below
commit f871e015ce97423838fc17faca68ff1fc1fbed17
Author: Karen Braganza <[email protected]>
AuthorDate: Fri Jan 24 12:15:09 2025 -0500
Allow internal retries when pending k8s pod is deleted (#45184)
* Remove code that fails task upon pending pod deletion
* Remove clear_not_launched_queued_tasks
* Remove units tests for clear_not_launched_queued_tasks
* Remove worker_pods_queued_check_interval configuration
* Remove worker_pods_queued_check_interval from provider.yaml
---------
Co-authored-by: Ryan Hatter <[email protected]>
Co-authored-by: Daniel Imberman <[email protected]>
---
.../kubernetes/executors/kubernetes_executor.py | 109 +----
.../executors/kubernetes_executor_utils.py | 4 -
.../providers/cncf/kubernetes/kube_config.py | 4 -
.../providers/cncf/kubernetes/provider.yaml | 7 -
.../executors/test_kubernetes_executor.py | 446 +--------------------
5 files changed, 2 insertions(+), 568 deletions(-)
diff --git
a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 482f99725b5..0f98f2f7dcb 100644
---
a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++
b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -38,7 +38,7 @@ from typing import TYPE_CHECKING, Any
from deprecated import deprecated
from kubernetes.dynamic import DynamicClient
-from sqlalchemy import or_, select, update
+from sqlalchemy import select
try:
from airflow.cli.cli_config import ARG_LOGICAL_DATE
@@ -60,7 +60,6 @@ from airflow.cli.cli_config import (
from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.executors.base_executor import BaseExecutor
-from airflow.executors.executor_constants import KUBERNETES_EXECUTOR
from airflow.providers.cncf.kubernetes.exceptions import
PodMutationHookException, PodReconciliationError
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types
import (
ADOPTED,
@@ -69,7 +68,6 @@ from
airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types impor
from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import
annotations_to_key
from airflow.stats import Stats
-from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import remove_escape_codes
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import TaskInstanceState
@@ -145,7 +143,6 @@ class KubernetesExecutor(BaseExecutor):
self.kube_scheduler: AirflowKubernetesScheduler | None = None
self.kube_client: client.CoreV1Api | None = None
self.scheduler_job_id: str | None = None
- self.event_scheduler: EventScheduler | None = None
self.last_handled: dict[TaskInstanceKey, float] = {}
self.kubernetes_queue: str | None = None
self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
@@ -218,96 +215,6 @@ class KubernetesExecutor(BaseExecutor):
pod_combined_search_str_to_pod_map[search_str] = pod
return pod_combined_search_str_to_pod_map
- @provide_session
- def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION)
-> None:
- """
- Clear tasks that were not yet launched, but were previously queued.
-
- Tasks can end up in a "Queued" state when a rescheduled/deferred
operator
- comes back up for execution (with the same try_number) before the
- pod of its previous incarnation has been fully removed (we think).
-
- It's also possible when an executor abruptly shuts down (leaving a
non-empty
- task_queue on that executor), but that scenario is handled via normal
adoption.
-
- This method checks each of our queued tasks to see if the
corresponding pod
- is around, and if not, and there's no matching entry in our own
- task_queue, marks it for re-execution.
- """
- if TYPE_CHECKING:
- assert self.kube_client
- from airflow.models.taskinstance import TaskInstance
-
- hybrid_executor_enabled = hasattr(TaskInstance, "executor")
- default_executor_alias = None
- if hybrid_executor_enabled:
- from airflow.executors.executor_loader import ExecutorLoader
-
- default_executor_name = ExecutorLoader.get_default_executor_name()
- default_executor_alias = default_executor_name.alias
-
- with
Stats.timer("kubernetes_executor.clear_not_launched_queued_tasks.duration"):
- self.log.debug("Clearing tasks that have not been launched")
- query = select(TaskInstance).where(
- TaskInstance.state == TaskInstanceState.QUEUED,
- TaskInstance.queued_by_job_id == self.job_id,
- )
- if self.kubernetes_queue:
- query = query.where(TaskInstance.queue ==
self.kubernetes_queue)
- # KUBERNETES_EXECUTOR is the string name/alias of the "core"
executor represented by this
- # module. The ExecutorName for "core" executors always contains an
alias and cannot be modified
- # to be different from the constant (in this case
KUBERNETES_EXECUTOR).
- elif hybrid_executor_enabled and default_executor_alias ==
KUBERNETES_EXECUTOR:
- query = query.where(
- or_(
- TaskInstance.executor == KUBERNETES_EXECUTOR,
- TaskInstance.executor.is_(None),
- ),
- )
- elif hybrid_executor_enabled:
- query = query.where(TaskInstance.executor ==
KUBERNETES_EXECUTOR)
- queued_tis: list[TaskInstance] = session.scalars(query).all()
- self.log.info("Found %s queued task instances", len(queued_tis))
-
- # Go through the "last seen" dictionary and clean out old entries
- allowed_age = self.kube_config.worker_pods_queued_check_interval *
3
- for key, timestamp in list(self.last_handled.items()):
- if time.time() - timestamp > allowed_age:
- del self.last_handled[key]
-
- if not queued_tis:
- return
-
- pod_combined_search_str_to_pod_map =
self.get_pod_combined_search_str_to_pod_map()
-
- for ti in queued_tis:
- self.log.debug("Checking task instance %s", ti)
-
- # Check to see if we've handled it ourselves recently
- if ti.key in self.last_handled:
- continue
-
- # Build the pod selector
- base_selector = f"dag_id={ti.dag_id},task_id={ti.task_id}"
- if ti.map_index >= 0:
- # Old tasks _couldn't_ be mapped, so we don't have to
worry about compat
- base_selector += f",map_index={ti.map_index}"
-
- search_str = f"{base_selector},run_id={ti.run_id}"
- if search_str in pod_combined_search_str_to_pod_map:
- continue
- self.log.info("TaskInstance: %s found in queued state but was
not launched, rescheduling", ti)
- session.execute(
- update(TaskInstance)
- .where(
- TaskInstance.dag_id == ti.dag_id,
- TaskInstance.task_id == ti.task_id,
- TaskInstance.run_id == ti.run_id,
- TaskInstance.map_index == ti.map_index,
- )
- .values(state=TaskInstanceState.SCHEDULED)
- )
-
def start(self) -> None:
"""Start the executor."""
self.log.info("Start Kubernetes executor")
@@ -325,15 +232,6 @@ class KubernetesExecutor(BaseExecutor):
kube_client=self.kube_client,
scheduler_job_id=self.scheduler_job_id,
)
- self.event_scheduler = EventScheduler()
-
- self.event_scheduler.call_regular_interval(
- self.kube_config.worker_pods_queued_check_interval,
- self.clear_not_launched_queued_tasks,
- )
- # We also call this at startup as that's the most likely time to see
- # stuck queued tasks
- self.clear_not_launched_queued_tasks()
def execute_async(
self,
@@ -378,7 +276,6 @@ class KubernetesExecutor(BaseExecutor):
assert self.kube_config
assert self.result_queue
assert self.task_queue
- assert self.event_scheduler
if self.running:
self.log.debug("self.running: %s", self.running)
@@ -466,10 +363,6 @@ class KubernetesExecutor(BaseExecutor):
finally:
self.task_queue.task_done()
- # Run any pending timed events
- next_event = self.event_scheduler.run(blocking=False)
- self.log.debug("Next timed event is in %f", next_event)
-
@provide_session
def _change_state(
self,
diff --git
a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index f21c2866e83..702703b2142 100644
---
a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++
b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -231,12 +231,8 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
)
elif status == "Pending":
# deletion_timestamp is set by kube server when a graceful
deletion is requested.
- # since kube server have received request to delete pod set TI
state failed
if event["type"] == "DELETED" and pod.metadata.deletion_timestamp:
self.log.info("Event: Failed to start pod %s, annotations:
%s", pod_name, annotations_string)
- self.watcher_queue.put(
- (pod_name, namespace, TaskInstanceState.FAILED,
annotations, resource_version)
- )
elif (
self.kube_config.worker_pod_pending_fatal_container_state_reasons
and "status" in event["raw_object"]
diff --git a/providers/src/airflow/providers/cncf/kubernetes/kube_config.py
b/providers/src/airflow/providers/cncf/kubernetes/kube_config.py
index 7a1de52928a..3f7ecee3277 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/kube_config.py
+++ b/providers/src/airflow/providers/cncf/kubernetes/kube_config.py
@@ -76,10 +76,6 @@ class KubeConfig:
# interact with cluster components.
self.executor_namespace = conf.get(self.kubernetes_section,
"namespace")
- self.worker_pods_queued_check_interval = conf.getint(
- self.kubernetes_section, "worker_pods_queued_check_interval"
- )
-
self.kube_client_request_args = conf.getjson(
self.kubernetes_section, "kube_client_request_args", fallback={}
)
diff --git a/providers/src/airflow/providers/cncf/kubernetes/provider.yaml
b/providers/src/airflow/providers/cncf/kubernetes/provider.yaml
index 631e502c2e3..9d38a70aa14 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/provider.yaml
+++ b/providers/src/airflow/providers/cncf/kubernetes/provider.yaml
@@ -368,13 +368,6 @@ config:
type: boolean
example: ~
default: "True"
- worker_pods_queued_check_interval:
- description: |
- How often in seconds to check for task instances stuck in "queued"
status without a pod
- version_added: ~
- type: integer
- example: ~
- default: "60"
ssl_ca_cert:
description: |
Path to a CA certificate to be used by the Kubernetes client to
verify the server's SSL certificate.
diff --git
a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
index cbca9f6e30b..2a600d6fc57 100644
--- a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -30,12 +30,6 @@ from urllib3 import HTTPResponse
from airflow import __version__
from airflow.exceptions import AirflowException
-from airflow.executors.executor_constants import (
- CELERY_EXECUTOR,
- CELERY_KUBERNETES_EXECUTOR,
- KUBERNETES_EXECUTOR,
-)
-from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.operators.empty import EmptyOperator
from airflow.providers.cncf.kubernetes import pod_generator
@@ -62,7 +56,6 @@ from
airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
from airflow.utils import timezone
from airflow.utils.state import State, TaskInstanceState
-from tests_common.test_utils.compat import BashOperator
from tests_common.test_utils.config import conf_vars
if __version__.startswith("2."):
@@ -1347,443 +1340,6 @@ class TestKubernetesExecutor:
assert executor.kube_config.multi_namespace_mode_namespace_list ==
expected_value_in_kube_config
- @pytest.mark.db_test
-
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
- @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
- def test_clear_not_launched_queued_tasks_not_launched(
- self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
- ):
- """If a pod isn't found for a TI, reset the state to scheduled"""
- mock_kube_client = mock.MagicMock()
- mock_kube_dynamic_client.return_value = mock.MagicMock()
- mock_pod_resource = mock.MagicMock()
- mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
- mock_kube_dynamic_client.return_value.get.return_value.items = []
-
- # This is hack to use overridden conf vars as it seems executors
loaded before conf override.
- if hasattr(TaskInstance, "executor"):
- import importlib
-
- from airflow.executors import executor_loader
-
- importlib.reload(executor_loader)
- create_dummy_dag(dag_id="test_clear", task_id="task1",
with_dagrun_type=None)
- dag_run = dag_maker.create_dagrun()
-
- ti = dag_run.task_instances[0]
- ti.state = State.QUEUED
- ti.queued_by_job_id = 1
- session.flush()
-
- executor = self.kubernetes_executor
- executor.job_id = 1
- executor.kube_client = mock_kube_client
- executor.clear_not_launched_queued_tasks(session=session)
-
- ti.refresh_from_db()
- assert ti.state == State.SCHEDULED
- assert mock_kube_dynamic_client.return_value.get.call_count == 1
- mock_kube_dynamic_client.return_value.get.assert_called_with(
- resource=mock_pod_resource,
- namespace="default",
- label_selector="airflow-worker=1",
- header_params={"Accept":
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
- )
-
- @pytest.mark.db_test
- @pytest.mark.parametrize(
- "task_queue, kubernetes_queue",
- [
- pytest.param("default", None),
- pytest.param("kubernetes", None),
- pytest.param("kubernetes", "kubernetes"),
- ],
- )
-
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
- @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
- def test_clear_not_launched_queued_tasks_launched(
- self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session,
task_queue, kubernetes_queue
- ):
- """Leave the state alone if a pod already exists"""
- mock_kube_client = mock.MagicMock()
- mock_kube_dynamic_client.return_value = mock.MagicMock()
- mock_pod_resource = mock.MagicMock()
- mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
- mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList(
- items=[
- k8s.V1Pod(
- metadata=k8s.V1ObjectMeta(
- annotations={
- "dag_id": "test_clear",
- "task_id": "task1",
- "run_id": "test",
- },
- labels={
- "role": "airflow-worker",
- "dag_id": "test_clear",
- "task_id": "task1",
- "airflow-worker": 1,
- "run_id": "test",
- },
- ),
- status=k8s.V1PodStatus(phase="Pending"),
- )
- ]
- )
-
- # This is hack to use overridden conf vars as it seems executors
loaded before conf override.
- if hasattr(TaskInstance, "executor"):
- import importlib
-
- from airflow.executors import executor_loader
-
- importlib.reload(executor_loader)
- create_dummy_dag(dag_id="test_clear", task_id="task1",
with_dagrun_type=None)
- dag_run = dag_maker.create_dagrun()
-
- ti = dag_run.task_instances[0]
- ti.state = State.QUEUED
- ti.queued_by_job_id = 1
- ti.queue = task_queue
- session.flush()
-
- executor = self.kubernetes_executor
- executor.job_id = 1
- executor.kubernetes_queue = kubernetes_queue
- executor.kube_client = mock_kube_client
- executor.clear_not_launched_queued_tasks(session=session)
-
- ti.refresh_from_db()
- assert ti.state == State.QUEUED
- mock_kube_dynamic_client.return_value.get.assert_called_once_with(
- resource=mock_pod_resource,
- namespace="default",
- label_selector="airflow-worker=1",
- header_params={"Accept":
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
- )
-
- @pytest.mark.db_test
-
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
- @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
- def test_clear_not_launched_queued_tasks_mapped_task(self,
mock_kube_dynamic_client, dag_maker, session):
- """One mapped task has a launched pod - other does not."""
-
- def get(*args, **kwargs):
- return k8s.V1PodList(
- items=[
- k8s.V1Pod(
- metadata=k8s.V1ObjectMeta(
- annotations={
- "dag_id": "test_clear",
- "task_id": "bash",
- "run_id": "test",
- "map_index": 0,
- },
- labels={
- "role": "airflow-worker",
- "dag_id": "test_clear",
- "task_id": "bash",
- "airflow-worker": 1,
- "map_index": 0,
- "run_id": "test",
- },
- ),
- status=k8s.V1PodStatus(phase="Pending"),
- )
- ]
- )
-
- mock_kube_client = mock.MagicMock()
- mock_kube_dynamic_client.return_value = mock.MagicMock()
- mock_pod_resource = mock.MagicMock()
- mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
- mock_kube_dynamic_client.return_value.get.side_effect = get
-
- # This is hack to use overridden conf vars as it seems executors
loaded before conf override.
- if hasattr(TaskInstance, "executor"):
- import importlib
-
- from airflow.executors import executor_loader
-
- importlib.reload(executor_loader)
- with dag_maker(dag_id="test_clear"):
- op =
BashOperator.partial(task_id="bash").expand(bash_command=["echo 0", "echo 1"])
-
- dag_run = dag_maker.create_dagrun()
- ti0 = dag_run.get_task_instance(op.task_id, session, map_index=0)
- ti0.state = State.QUEUED
- ti0.queued_by_job_id = 1
-
- ti1 = dag_run.get_task_instance(op.task_id, session, map_index=1)
- ti1.state = State.QUEUED
- ti1.queued_by_job_id = 1
-
- session.flush()
-
- executor = self.kubernetes_executor
- executor.job_id = 1
- executor.kube_client = mock_kube_client
- executor.clear_not_launched_queued_tasks(session=session)
-
- ti0.refresh_from_db()
- ti1.refresh_from_db()
- assert ti0.state == State.QUEUED
- assert ti1.state == State.SCHEDULED
-
- assert mock_kube_dynamic_client.return_value.get.call_count == 1
- mock_kube_dynamic_client.return_value.get.assert_called_with(
- resource=mock_pod_resource,
- namespace="default",
- label_selector="airflow-worker=1",
- header_params={"Accept":
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
- )
-
- @pytest.mark.db_test
- @conf_vars({("core", "executor"): CELERY_KUBERNETES_EXECUTOR})
- def test_clear_not_launched_queued_tasks_not_launched_other_queue(
- self, dag_maker, create_dummy_dag, session
- ):
- """Queued TI has no pod, but it is not queued for the k8s executor"""
- mock_kube_client = mock.MagicMock()
- mock_kube_client.list_namespaced_pod.return_value =
k8s.V1PodList(items=[])
-
- # This is hack to use overridden conf vars as it seems executors
loaded before conf override.
- if hasattr(TaskInstance, "executor"):
- import importlib
-
- from airflow.executors import executor_loader
-
- importlib.reload(executor_loader)
- create_dummy_dag(dag_id="test_clear", task_id="task1",
with_dagrun_type=None)
- dag_run = dag_maker.create_dagrun()
-
- ti = dag_run.task_instances[0]
- ti.state = State.QUEUED
- ti.queued_by_job_id = 1
- session.flush()
-
- executor = self.kubernetes_executor
- executor.job_id = 1
- executor.kubernetes_queue = "kubernetes"
-
- executor.kube_client = mock_kube_client
- executor.clear_not_launched_queued_tasks(session=session)
-
- ti.refresh_from_db()
- assert ti.state == State.QUEUED
- assert mock_kube_client.list_namespaced_pod.call_count == 0
-
- @pytest.mark.db_test
- @pytest.mark.skipif(
- not hasattr(TaskInstance, "executor"), reason="Hybrid executor added
in later version"
- )
-
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
- @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
- def test_clear_not_launched_queued_tasks_not_launched_other_executor(
- self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
- ):
- """Queued TI has no pod, but it is not queued for the k8s executor"""
- mock_kube_client = mock.MagicMock()
- mock_kube_dynamic_client.return_value = mock.MagicMock()
- mock_pod_resource = mock.MagicMock()
- mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
- mock_kube_dynamic_client.return_value.get.return_value.items = []
-
- # This is hack to use overridden conf vars as it seems executors
loaded before conf override.
- if hasattr(TaskInstance, "executor"):
- import importlib
-
- from airflow.executors import executor_loader
-
- importlib.reload(executor_loader)
- create_dummy_dag(dag_id="test_clear", task_id="task1",
with_dagrun_type=None)
- dag_run = dag_maker.create_dagrun()
-
- ti = dag_run.task_instances[0]
- ti.state = State.QUEUED
- ti.queued_by_job_id = 1
- ti.executor = "CeleryExecutor"
- session.flush()
-
- executor = self.kubernetes_executor
- executor.job_id = 1
-
- executor.kube_client = mock_kube_client
- executor.clear_not_launched_queued_tasks(session=session)
-
- ti.refresh_from_db()
- assert ti.executor == "CeleryExecutor"
- assert ti.state == State.QUEUED
- assert mock_kube_client.list_namespaced_pod.call_count == 0
-
- @pytest.mark.db_test
- @pytest.mark.skipif(
- not hasattr(TaskInstance, "executor"), reason="Hybrid executor added
in later version"
- )
-
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
- @conf_vars({("core", "executor"): CELERY_EXECUTOR})
- def
test_clear_not_launched_queued_tasks_not_launched_other_default_executor(
- self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
- ):
- """Queued TI has no pod, but it is not queued for the k8s executor"""
- mock_kube_client = mock.MagicMock()
- mock_kube_dynamic_client.return_value = mock.MagicMock()
- mock_pod_resource = mock.MagicMock()
- mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
- mock_kube_dynamic_client.return_value.get.return_value.items = []
-
- # This is hack to use overridden conf vars as it seems executors
loaded before conf override.
- if hasattr(TaskInstance, "executor"):
- import importlib
-
- from airflow.executors import executor_loader
-
- importlib.reload(executor_loader)
- create_dummy_dag(dag_id="test_clear", task_id="task1",
with_dagrun_type=None)
- dag_run = dag_maker.create_dagrun()
-
- ti = dag_run.task_instances[0]
- ti.state = State.QUEUED
- ti.queued_by_job_id = 1
- session.flush()
-
- executor = self.kubernetes_executor
- executor.job_id = 1
-
- executor.kube_client = mock_kube_client
- executor.clear_not_launched_queued_tasks(session=session)
-
- ti.refresh_from_db()
- assert ti.state == State.QUEUED
- assert mock_kube_client.list_namespaced_pod.call_count == 0
-
- @pytest.mark.db_test
- @pytest.mark.skipif(
- not hasattr(TaskInstance, "executor"), reason="Hybrid executor added
in later version"
- )
-
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
- @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
- def test_clear_not_launched_queued_tasks_launched_none_executor(
- self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
- ):
- """Queued TI has no pod, but it is not queued for the k8s executor"""
- mock_kube_client = mock.MagicMock()
- mock_kube_dynamic_client.return_value = mock.MagicMock()
- mock_pod_resource = mock.MagicMock()
- mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
- mock_kube_dynamic_client.return_value.get.return_value.items = []
-
- # This is hack to use overridden conf vars as it seems executors
loaded before conf override.
- if hasattr(TaskInstance, "executor"):
- import importlib
-
- from airflow.executors import executor_loader
-
- importlib.reload(executor_loader)
- create_dummy_dag(dag_id="test_clear", task_id="task1",
with_dagrun_type=None)
- dag_run = dag_maker.create_dagrun()
-
- ti = dag_run.task_instances[0]
- ti.state = State.QUEUED
- ti.queued_by_job_id = 1
- session.flush()
-
- executor = self.kubernetes_executor
- executor.job_id = 1
-
- executor.kube_client = mock_kube_client
- executor.clear_not_launched_queued_tasks(session=session)
-
- ti.refresh_from_db()
- assert ti.state == State.SCHEDULED
- assert mock_kube_dynamic_client.return_value.get.call_count == 1
-
- @pytest.mark.db_test
- @pytest.mark.skipif(
- not hasattr(TaskInstance, "executor"), reason="Hybrid executor added
in later version"
- )
-
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
- @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
- def test_clear_not_launched_queued_tasks_launched_kubernetes_executor(
- self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
- ):
- """Queued TI has no pod, but it is not queued for the k8s executor"""
- mock_kube_client = mock.MagicMock()
- mock_kube_dynamic_client.return_value = mock.MagicMock()
- mock_pod_resource = mock.MagicMock()
- mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
- mock_kube_dynamic_client.return_value.get.return_value.items = []
-
- # This is hack to use overridden conf vars as it seems executors
loaded before conf override.
- if hasattr(TaskInstance, "executor"):
- import importlib
-
- from airflow.executors import executor_loader
-
- importlib.reload(executor_loader)
- create_dummy_dag(dag_id="test_clear", task_id="task1",
with_dagrun_type=None)
- dag_run = dag_maker.create_dagrun()
-
- ti = dag_run.task_instances[0]
- ti.state = State.QUEUED
- ti.queued_by_job_id = 1
- ti.executor = KUBERNETES_EXECUTOR
- session.flush()
-
- executor = self.kubernetes_executor
- executor.job_id = 1
-
- executor.kube_client = mock_kube_client
- executor.clear_not_launched_queued_tasks(session=session)
-
- ti.refresh_from_db()
- assert ti.state == State.SCHEDULED
- assert mock_kube_dynamic_client.return_value.get.call_count == 1
-
- @pytest.mark.db_test
-
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
- @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
- def test_clear_not_launched_queued_tasks_clear_only_by_job_id(
- self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
- ):
- """clear only not launched queued tasks which are queued by the same
executor job"""
- mock_kube_client = mock.MagicMock()
- mock_kube_dynamic_client.return_value = mock.MagicMock()
- mock_kube_dynamic_client.return_value.get.return_value =
k8s.V1PodList(items=[])
-
- # This is hack to use overridden conf vars as it seems executors
loaded before conf override.
- if hasattr(TaskInstance, "executor"):
- import importlib
-
- from airflow.executors import executor_loader
-
- importlib.reload(executor_loader)
- create_dummy_dag(dag_id="test_clear_0", task_id="task0",
with_dagrun_type=None)
- dag_run = dag_maker.create_dagrun()
-
- ti0 = dag_run.task_instances[0]
- ti0.state = State.QUEUED
- ti0.queued_by_job_id = 1
- session.flush()
-
- create_dummy_dag(dag_id="test_clear_1", task_id="task1",
with_dagrun_type=None)
- dag_run = dag_maker.create_dagrun()
-
- ti1 = dag_run.task_instances[0]
- ti1.state = State.QUEUED
- ti1.queued_by_job_id = 2
- session.flush()
-
- executor = self.kubernetes_executor
- executor.job_id = 1
- executor.kube_client = mock_kube_client
- executor.clear_not_launched_queued_tasks(session=session)
-
- ti0.refresh_from_db()
- ti1.refresh_from_db()
- assert ti0.state == State.SCHEDULED
- assert ti1.state == State.QUEUED
-
@pytest.mark.db_test
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_get_task_log(self, mock_get_kube_client,
create_task_instance_of_operator):
@@ -2125,7 +1681,7 @@ class TestKubernetesJobWatcher:
self.pod.metadata.deletion_timestamp = timezone.utcnow()
self._run()
- self.assert_watcher_queue_called_once_with_state(State.FAILED)
+ self.watcher.watcher_queue.put.assert_not_called()
def test_process_status_failed(self):
self.pod.status.phase = "Failed"