This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f871e015ce9 Allow internal retries when pending k8s pod is deleted 
(#45184)
f871e015ce9 is described below

commit f871e015ce97423838fc17faca68ff1fc1fbed17
Author: Karen Braganza <[email protected]>
AuthorDate: Fri Jan 24 12:15:09 2025 -0500

    Allow internal retries when pending k8s pod is deleted (#45184)
    
    * Remove code that fails task upon pending pod deletion
    
    * Remove clear_not_launched_queued_tasks
    
    * Remove units tests for clear_not_launched_queued_tasks
    
    * Remove worker_pods_queued_check_interval configuration
    
    * Remove worker_pods_queued_check_interval from provider.yaml
    
    ---------
    
    Co-authored-by: Ryan Hatter <[email protected]>
    Co-authored-by: Daniel Imberman <[email protected]>
---
 .../kubernetes/executors/kubernetes_executor.py    | 109 +----
 .../executors/kubernetes_executor_utils.py         |   4 -
 .../providers/cncf/kubernetes/kube_config.py       |   4 -
 .../providers/cncf/kubernetes/provider.yaml        |   7 -
 .../executors/test_kubernetes_executor.py          | 446 +--------------------
 5 files changed, 2 insertions(+), 568 deletions(-)

diff --git 
a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
 
b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 482f99725b5..0f98f2f7dcb 100644
--- 
a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ 
b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -38,7 +38,7 @@ from typing import TYPE_CHECKING, Any
 
 from deprecated import deprecated
 from kubernetes.dynamic import DynamicClient
-from sqlalchemy import or_, select, update
+from sqlalchemy import select
 
 try:
     from airflow.cli.cli_config import ARG_LOGICAL_DATE
@@ -60,7 +60,6 @@ from airflow.cli.cli_config import (
 from airflow.configuration import conf
 from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.executors.base_executor import BaseExecutor
-from airflow.executors.executor_constants import KUBERNETES_EXECUTOR
 from airflow.providers.cncf.kubernetes.exceptions import 
PodMutationHookException, PodReconciliationError
 from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types 
import (
     ADOPTED,
@@ -69,7 +68,6 @@ from 
airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types impor
 from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
 from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import 
annotations_to_key
 from airflow.stats import Stats
-from airflow.utils.event_scheduler import EventScheduler
 from airflow.utils.log.logging_mixin import remove_escape_codes
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import TaskInstanceState
@@ -145,7 +143,6 @@ class KubernetesExecutor(BaseExecutor):
         self.kube_scheduler: AirflowKubernetesScheduler | None = None
         self.kube_client: client.CoreV1Api | None = None
         self.scheduler_job_id: str | None = None
-        self.event_scheduler: EventScheduler | None = None
         self.last_handled: dict[TaskInstanceKey, float] = {}
         self.kubernetes_queue: str | None = None
         self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
@@ -218,96 +215,6 @@ class KubernetesExecutor(BaseExecutor):
                 pod_combined_search_str_to_pod_map[search_str] = pod
         return pod_combined_search_str_to_pod_map
 
-    @provide_session
-    def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) 
-> None:
-        """
-        Clear tasks that were not yet launched, but were previously queued.
-
-        Tasks can end up in a "Queued" state when a rescheduled/deferred 
operator
-        comes back up for execution (with the same try_number) before the
-        pod of its previous incarnation has been fully removed (we think).
-
-        It's also possible when an executor abruptly shuts down (leaving a 
non-empty
-        task_queue on that executor), but that scenario is handled via normal 
adoption.
-
-        This method checks each of our queued tasks to see if the 
corresponding pod
-        is around, and if not, and there's no matching entry in our own
-        task_queue, marks it for re-execution.
-        """
-        if TYPE_CHECKING:
-            assert self.kube_client
-        from airflow.models.taskinstance import TaskInstance
-
-        hybrid_executor_enabled = hasattr(TaskInstance, "executor")
-        default_executor_alias = None
-        if hybrid_executor_enabled:
-            from airflow.executors.executor_loader import ExecutorLoader
-
-            default_executor_name = ExecutorLoader.get_default_executor_name()
-            default_executor_alias = default_executor_name.alias
-
-        with 
Stats.timer("kubernetes_executor.clear_not_launched_queued_tasks.duration"):
-            self.log.debug("Clearing tasks that have not been launched")
-            query = select(TaskInstance).where(
-                TaskInstance.state == TaskInstanceState.QUEUED,
-                TaskInstance.queued_by_job_id == self.job_id,
-            )
-            if self.kubernetes_queue:
-                query = query.where(TaskInstance.queue == 
self.kubernetes_queue)
-            # KUBERNETES_EXECUTOR is the string name/alias of the "core" 
executor represented by this
-            # module. The ExecutorName for "core" executors always contains an 
alias and cannot be modified
-            # to be different from the constant (in this case 
KUBERNETES_EXECUTOR).
-            elif hybrid_executor_enabled and default_executor_alias == 
KUBERNETES_EXECUTOR:
-                query = query.where(
-                    or_(
-                        TaskInstance.executor == KUBERNETES_EXECUTOR,
-                        TaskInstance.executor.is_(None),
-                    ),
-                )
-            elif hybrid_executor_enabled:
-                query = query.where(TaskInstance.executor == 
KUBERNETES_EXECUTOR)
-            queued_tis: list[TaskInstance] = session.scalars(query).all()
-            self.log.info("Found %s queued task instances", len(queued_tis))
-
-            # Go through the "last seen" dictionary and clean out old entries
-            allowed_age = self.kube_config.worker_pods_queued_check_interval * 
3
-            for key, timestamp in list(self.last_handled.items()):
-                if time.time() - timestamp > allowed_age:
-                    del self.last_handled[key]
-
-            if not queued_tis:
-                return
-
-            pod_combined_search_str_to_pod_map = 
self.get_pod_combined_search_str_to_pod_map()
-
-            for ti in queued_tis:
-                self.log.debug("Checking task instance %s", ti)
-
-                # Check to see if we've handled it ourselves recently
-                if ti.key in self.last_handled:
-                    continue
-
-                # Build the pod selector
-                base_selector = f"dag_id={ti.dag_id},task_id={ti.task_id}"
-                if ti.map_index >= 0:
-                    # Old tasks _couldn't_ be mapped, so we don't have to 
worry about compat
-                    base_selector += f",map_index={ti.map_index}"
-
-                search_str = f"{base_selector},run_id={ti.run_id}"
-                if search_str in pod_combined_search_str_to_pod_map:
-                    continue
-                self.log.info("TaskInstance: %s found in queued state but was 
not launched, rescheduling", ti)
-                session.execute(
-                    update(TaskInstance)
-                    .where(
-                        TaskInstance.dag_id == ti.dag_id,
-                        TaskInstance.task_id == ti.task_id,
-                        TaskInstance.run_id == ti.run_id,
-                        TaskInstance.map_index == ti.map_index,
-                    )
-                    .values(state=TaskInstanceState.SCHEDULED)
-                )
-
     def start(self) -> None:
         """Start the executor."""
         self.log.info("Start Kubernetes executor")
@@ -325,15 +232,6 @@ class KubernetesExecutor(BaseExecutor):
             kube_client=self.kube_client,
             scheduler_job_id=self.scheduler_job_id,
         )
-        self.event_scheduler = EventScheduler()
-
-        self.event_scheduler.call_regular_interval(
-            self.kube_config.worker_pods_queued_check_interval,
-            self.clear_not_launched_queued_tasks,
-        )
-        # We also call this at startup as that's the most likely time to see
-        # stuck queued tasks
-        self.clear_not_launched_queued_tasks()
 
     def execute_async(
         self,
@@ -378,7 +276,6 @@ class KubernetesExecutor(BaseExecutor):
             assert self.kube_config
             assert self.result_queue
             assert self.task_queue
-            assert self.event_scheduler
 
         if self.running:
             self.log.debug("self.running: %s", self.running)
@@ -466,10 +363,6 @@ class KubernetesExecutor(BaseExecutor):
                 finally:
                     self.task_queue.task_done()
 
-        # Run any pending timed events
-        next_event = self.event_scheduler.run(blocking=False)
-        self.log.debug("Next timed event is in %f", next_event)
-
     @provide_session
     def _change_state(
         self,
diff --git 
a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
 
b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index f21c2866e83..702703b2142 100644
--- 
a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ 
b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -231,12 +231,8 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
             )
         elif status == "Pending":
             # deletion_timestamp is set by kube server when a graceful 
deletion is requested.
-            # since kube server have received request to delete pod set TI 
state failed
             if event["type"] == "DELETED" and pod.metadata.deletion_timestamp:
                 self.log.info("Event: Failed to start pod %s, annotations: 
%s", pod_name, annotations_string)
-                self.watcher_queue.put(
-                    (pod_name, namespace, TaskInstanceState.FAILED, 
annotations, resource_version)
-                )
             elif (
                 
self.kube_config.worker_pod_pending_fatal_container_state_reasons
                 and "status" in event["raw_object"]
diff --git a/providers/src/airflow/providers/cncf/kubernetes/kube_config.py 
b/providers/src/airflow/providers/cncf/kubernetes/kube_config.py
index 7a1de52928a..3f7ecee3277 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/kube_config.py
+++ b/providers/src/airflow/providers/cncf/kubernetes/kube_config.py
@@ -76,10 +76,6 @@ class KubeConfig:
         # interact with cluster components.
         self.executor_namespace = conf.get(self.kubernetes_section, 
"namespace")
 
-        self.worker_pods_queued_check_interval = conf.getint(
-            self.kubernetes_section, "worker_pods_queued_check_interval"
-        )
-
         self.kube_client_request_args = conf.getjson(
             self.kubernetes_section, "kube_client_request_args", fallback={}
         )
diff --git a/providers/src/airflow/providers/cncf/kubernetes/provider.yaml 
b/providers/src/airflow/providers/cncf/kubernetes/provider.yaml
index 631e502c2e3..9d38a70aa14 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/provider.yaml
+++ b/providers/src/airflow/providers/cncf/kubernetes/provider.yaml
@@ -368,13 +368,6 @@ config:
         type: boolean
         example: ~
         default: "True"
-      worker_pods_queued_check_interval:
-        description: |
-          How often in seconds to check for task instances stuck in "queued" 
status without a pod
-        version_added: ~
-        type: integer
-        example: ~
-        default: "60"
       ssl_ca_cert:
         description: |
           Path to a CA certificate to be used by the Kubernetes client to 
verify the server's SSL certificate.
diff --git 
a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py 
b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
index cbca9f6e30b..2a600d6fc57 100644
--- a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -30,12 +30,6 @@ from urllib3 import HTTPResponse
 
 from airflow import __version__
 from airflow.exceptions import AirflowException
-from airflow.executors.executor_constants import (
-    CELERY_EXECUTOR,
-    CELERY_KUBERNETES_EXECUTOR,
-    KUBERNETES_EXECUTOR,
-)
-from airflow.models.taskinstance import TaskInstance
 from airflow.models.taskinstancekey import TaskInstanceKey
 from airflow.operators.empty import EmptyOperator
 from airflow.providers.cncf.kubernetes import pod_generator
@@ -62,7 +56,6 @@ from 
airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
 from airflow.utils import timezone
 from airflow.utils.state import State, TaskInstanceState
 
-from tests_common.test_utils.compat import BashOperator
 from tests_common.test_utils.config import conf_vars
 
 if __version__.startswith("2."):
@@ -1347,443 +1340,6 @@ class TestKubernetesExecutor:
 
         assert executor.kube_config.multi_namespace_mode_namespace_list == 
expected_value_in_kube_config
 
-    @pytest.mark.db_test
-    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
-    @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
-    def test_clear_not_launched_queued_tasks_not_launched(
-        self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
-    ):
-        """If a pod isn't found for a TI, reset the state to scheduled"""
-        mock_kube_client = mock.MagicMock()
-        mock_kube_dynamic_client.return_value = mock.MagicMock()
-        mock_pod_resource = mock.MagicMock()
-        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
-        mock_kube_dynamic_client.return_value.get.return_value.items = []
-
-        # This is hack to use overridden conf vars as it seems executors 
loaded before conf override.
-        if hasattr(TaskInstance, "executor"):
-            import importlib
-
-            from airflow.executors import executor_loader
-
-            importlib.reload(executor_loader)
-        create_dummy_dag(dag_id="test_clear", task_id="task1", 
with_dagrun_type=None)
-        dag_run = dag_maker.create_dagrun()
-
-        ti = dag_run.task_instances[0]
-        ti.state = State.QUEUED
-        ti.queued_by_job_id = 1
-        session.flush()
-
-        executor = self.kubernetes_executor
-        executor.job_id = 1
-        executor.kube_client = mock_kube_client
-        executor.clear_not_launched_queued_tasks(session=session)
-
-        ti.refresh_from_db()
-        assert ti.state == State.SCHEDULED
-        assert mock_kube_dynamic_client.return_value.get.call_count == 1
-        mock_kube_dynamic_client.return_value.get.assert_called_with(
-            resource=mock_pod_resource,
-            namespace="default",
-            label_selector="airflow-worker=1",
-            header_params={"Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
-        )
-
-    @pytest.mark.db_test
-    @pytest.mark.parametrize(
-        "task_queue, kubernetes_queue",
-        [
-            pytest.param("default", None),
-            pytest.param("kubernetes", None),
-            pytest.param("kubernetes", "kubernetes"),
-        ],
-    )
-    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
-    @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
-    def test_clear_not_launched_queued_tasks_launched(
-        self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session, 
task_queue, kubernetes_queue
-    ):
-        """Leave the state alone if a pod already exists"""
-        mock_kube_client = mock.MagicMock()
-        mock_kube_dynamic_client.return_value = mock.MagicMock()
-        mock_pod_resource = mock.MagicMock()
-        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
-        mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList(
-            items=[
-                k8s.V1Pod(
-                    metadata=k8s.V1ObjectMeta(
-                        annotations={
-                            "dag_id": "test_clear",
-                            "task_id": "task1",
-                            "run_id": "test",
-                        },
-                        labels={
-                            "role": "airflow-worker",
-                            "dag_id": "test_clear",
-                            "task_id": "task1",
-                            "airflow-worker": 1,
-                            "run_id": "test",
-                        },
-                    ),
-                    status=k8s.V1PodStatus(phase="Pending"),
-                )
-            ]
-        )
-
-        # This is hack to use overridden conf vars as it seems executors 
loaded before conf override.
-        if hasattr(TaskInstance, "executor"):
-            import importlib
-
-            from airflow.executors import executor_loader
-
-            importlib.reload(executor_loader)
-        create_dummy_dag(dag_id="test_clear", task_id="task1", 
with_dagrun_type=None)
-        dag_run = dag_maker.create_dagrun()
-
-        ti = dag_run.task_instances[0]
-        ti.state = State.QUEUED
-        ti.queued_by_job_id = 1
-        ti.queue = task_queue
-        session.flush()
-
-        executor = self.kubernetes_executor
-        executor.job_id = 1
-        executor.kubernetes_queue = kubernetes_queue
-        executor.kube_client = mock_kube_client
-        executor.clear_not_launched_queued_tasks(session=session)
-
-        ti.refresh_from_db()
-        assert ti.state == State.QUEUED
-        mock_kube_dynamic_client.return_value.get.assert_called_once_with(
-            resource=mock_pod_resource,
-            namespace="default",
-            label_selector="airflow-worker=1",
-            header_params={"Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
-        )
-
-    @pytest.mark.db_test
-    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
-    @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
-    def test_clear_not_launched_queued_tasks_mapped_task(self, 
mock_kube_dynamic_client, dag_maker, session):
-        """One mapped task has a launched pod - other does not."""
-
-        def get(*args, **kwargs):
-            return k8s.V1PodList(
-                items=[
-                    k8s.V1Pod(
-                        metadata=k8s.V1ObjectMeta(
-                            annotations={
-                                "dag_id": "test_clear",
-                                "task_id": "bash",
-                                "run_id": "test",
-                                "map_index": 0,
-                            },
-                            labels={
-                                "role": "airflow-worker",
-                                "dag_id": "test_clear",
-                                "task_id": "bash",
-                                "airflow-worker": 1,
-                                "map_index": 0,
-                                "run_id": "test",
-                            },
-                        ),
-                        status=k8s.V1PodStatus(phase="Pending"),
-                    )
-                ]
-            )
-
-        mock_kube_client = mock.MagicMock()
-        mock_kube_dynamic_client.return_value = mock.MagicMock()
-        mock_pod_resource = mock.MagicMock()
-        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
-        mock_kube_dynamic_client.return_value.get.side_effect = get
-
-        # This is hack to use overridden conf vars as it seems executors 
loaded before conf override.
-        if hasattr(TaskInstance, "executor"):
-            import importlib
-
-            from airflow.executors import executor_loader
-
-            importlib.reload(executor_loader)
-        with dag_maker(dag_id="test_clear"):
-            op = 
BashOperator.partial(task_id="bash").expand(bash_command=["echo 0", "echo 1"])
-
-        dag_run = dag_maker.create_dagrun()
-        ti0 = dag_run.get_task_instance(op.task_id, session, map_index=0)
-        ti0.state = State.QUEUED
-        ti0.queued_by_job_id = 1
-
-        ti1 = dag_run.get_task_instance(op.task_id, session, map_index=1)
-        ti1.state = State.QUEUED
-        ti1.queued_by_job_id = 1
-
-        session.flush()
-
-        executor = self.kubernetes_executor
-        executor.job_id = 1
-        executor.kube_client = mock_kube_client
-        executor.clear_not_launched_queued_tasks(session=session)
-
-        ti0.refresh_from_db()
-        ti1.refresh_from_db()
-        assert ti0.state == State.QUEUED
-        assert ti1.state == State.SCHEDULED
-
-        assert mock_kube_dynamic_client.return_value.get.call_count == 1
-        mock_kube_dynamic_client.return_value.get.assert_called_with(
-            resource=mock_pod_resource,
-            namespace="default",
-            label_selector="airflow-worker=1",
-            header_params={"Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
-        )
-
-    @pytest.mark.db_test
-    @conf_vars({("core", "executor"): CELERY_KUBERNETES_EXECUTOR})
-    def test_clear_not_launched_queued_tasks_not_launched_other_queue(
-        self, dag_maker, create_dummy_dag, session
-    ):
-        """Queued TI has no pod, but it is not queued for the k8s executor"""
-        mock_kube_client = mock.MagicMock()
-        mock_kube_client.list_namespaced_pod.return_value = 
k8s.V1PodList(items=[])
-
-        # This is hack to use overridden conf vars as it seems executors 
loaded before conf override.
-        if hasattr(TaskInstance, "executor"):
-            import importlib
-
-            from airflow.executors import executor_loader
-
-            importlib.reload(executor_loader)
-        create_dummy_dag(dag_id="test_clear", task_id="task1", 
with_dagrun_type=None)
-        dag_run = dag_maker.create_dagrun()
-
-        ti = dag_run.task_instances[0]
-        ti.state = State.QUEUED
-        ti.queued_by_job_id = 1
-        session.flush()
-
-        executor = self.kubernetes_executor
-        executor.job_id = 1
-        executor.kubernetes_queue = "kubernetes"
-
-        executor.kube_client = mock_kube_client
-        executor.clear_not_launched_queued_tasks(session=session)
-
-        ti.refresh_from_db()
-        assert ti.state == State.QUEUED
-        assert mock_kube_client.list_namespaced_pod.call_count == 0
-
-    @pytest.mark.db_test
-    @pytest.mark.skipif(
-        not hasattr(TaskInstance, "executor"), reason="Hybrid executor added 
in later version"
-    )
-    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
-    @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
-    def test_clear_not_launched_queued_tasks_not_launched_other_executor(
-        self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
-    ):
-        """Queued TI has no pod, but it is not queued for the k8s executor"""
-        mock_kube_client = mock.MagicMock()
-        mock_kube_dynamic_client.return_value = mock.MagicMock()
-        mock_pod_resource = mock.MagicMock()
-        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
-        mock_kube_dynamic_client.return_value.get.return_value.items = []
-
-        # This is hack to use overridden conf vars as it seems executors 
loaded before conf override.
-        if hasattr(TaskInstance, "executor"):
-            import importlib
-
-            from airflow.executors import executor_loader
-
-            importlib.reload(executor_loader)
-        create_dummy_dag(dag_id="test_clear", task_id="task1", 
with_dagrun_type=None)
-        dag_run = dag_maker.create_dagrun()
-
-        ti = dag_run.task_instances[0]
-        ti.state = State.QUEUED
-        ti.queued_by_job_id = 1
-        ti.executor = "CeleryExecutor"
-        session.flush()
-
-        executor = self.kubernetes_executor
-        executor.job_id = 1
-
-        executor.kube_client = mock_kube_client
-        executor.clear_not_launched_queued_tasks(session=session)
-
-        ti.refresh_from_db()
-        assert ti.executor == "CeleryExecutor"
-        assert ti.state == State.QUEUED
-        assert mock_kube_client.list_namespaced_pod.call_count == 0
-
-    @pytest.mark.db_test
-    @pytest.mark.skipif(
-        not hasattr(TaskInstance, "executor"), reason="Hybrid executor added 
in later version"
-    )
-    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
-    @conf_vars({("core", "executor"): CELERY_EXECUTOR})
-    def 
test_clear_not_launched_queued_tasks_not_launched_other_default_executor(
-        self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
-    ):
-        """Queued TI has no pod, but it is not queued for the k8s executor"""
-        mock_kube_client = mock.MagicMock()
-        mock_kube_dynamic_client.return_value = mock.MagicMock()
-        mock_pod_resource = mock.MagicMock()
-        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
-        mock_kube_dynamic_client.return_value.get.return_value.items = []
-
-        # This is hack to use overridden conf vars as it seems executors 
loaded before conf override.
-        if hasattr(TaskInstance, "executor"):
-            import importlib
-
-            from airflow.executors import executor_loader
-
-            importlib.reload(executor_loader)
-        create_dummy_dag(dag_id="test_clear", task_id="task1", 
with_dagrun_type=None)
-        dag_run = dag_maker.create_dagrun()
-
-        ti = dag_run.task_instances[0]
-        ti.state = State.QUEUED
-        ti.queued_by_job_id = 1
-        session.flush()
-
-        executor = self.kubernetes_executor
-        executor.job_id = 1
-
-        executor.kube_client = mock_kube_client
-        executor.clear_not_launched_queued_tasks(session=session)
-
-        ti.refresh_from_db()
-        assert ti.state == State.QUEUED
-        assert mock_kube_client.list_namespaced_pod.call_count == 0
-
-    @pytest.mark.db_test
-    @pytest.mark.skipif(
-        not hasattr(TaskInstance, "executor"), reason="Hybrid executor added 
in later version"
-    )
-    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
-    @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
-    def test_clear_not_launched_queued_tasks_launched_none_executor(
-        self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
-    ):
-        """Queued TI has no pod, but it is not queued for the k8s executor"""
-        mock_kube_client = mock.MagicMock()
-        mock_kube_dynamic_client.return_value = mock.MagicMock()
-        mock_pod_resource = mock.MagicMock()
-        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
-        mock_kube_dynamic_client.return_value.get.return_value.items = []
-
-        # This is hack to use overridden conf vars as it seems executors 
loaded before conf override.
-        if hasattr(TaskInstance, "executor"):
-            import importlib
-
-            from airflow.executors import executor_loader
-
-            importlib.reload(executor_loader)
-        create_dummy_dag(dag_id="test_clear", task_id="task1", 
with_dagrun_type=None)
-        dag_run = dag_maker.create_dagrun()
-
-        ti = dag_run.task_instances[0]
-        ti.state = State.QUEUED
-        ti.queued_by_job_id = 1
-        session.flush()
-
-        executor = self.kubernetes_executor
-        executor.job_id = 1
-
-        executor.kube_client = mock_kube_client
-        executor.clear_not_launched_queued_tasks(session=session)
-
-        ti.refresh_from_db()
-        assert ti.state == State.SCHEDULED
-        assert mock_kube_dynamic_client.return_value.get.call_count == 1
-
-    @pytest.mark.db_test
-    @pytest.mark.skipif(
-        not hasattr(TaskInstance, "executor"), reason="Hybrid executor added 
in later version"
-    )
-    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
-    @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
-    def test_clear_not_launched_queued_tasks_launched_kubernetes_executor(
-        self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
-    ):
-        """Queued TI has no pod, but it is not queued for the k8s executor"""
-        mock_kube_client = mock.MagicMock()
-        mock_kube_dynamic_client.return_value = mock.MagicMock()
-        mock_pod_resource = mock.MagicMock()
-        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
-        mock_kube_dynamic_client.return_value.get.return_value.items = []
-
-        # This is hack to use overridden conf vars as it seems executors 
loaded before conf override.
-        if hasattr(TaskInstance, "executor"):
-            import importlib
-
-            from airflow.executors import executor_loader
-
-            importlib.reload(executor_loader)
-        create_dummy_dag(dag_id="test_clear", task_id="task1", 
with_dagrun_type=None)
-        dag_run = dag_maker.create_dagrun()
-
-        ti = dag_run.task_instances[0]
-        ti.state = State.QUEUED
-        ti.queued_by_job_id = 1
-        ti.executor = KUBERNETES_EXECUTOR
-        session.flush()
-
-        executor = self.kubernetes_executor
-        executor.job_id = 1
-
-        executor.kube_client = mock_kube_client
-        executor.clear_not_launched_queued_tasks(session=session)
-
-        ti.refresh_from_db()
-        assert ti.state == State.SCHEDULED
-        assert mock_kube_dynamic_client.return_value.get.call_count == 1
-
-    @pytest.mark.db_test
-    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
-    @conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
-    def test_clear_not_launched_queued_tasks_clear_only_by_job_id(
-        self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
-    ):
-        """clear only not launched queued  tasks which are queued by the same 
executor job"""
-        mock_kube_client = mock.MagicMock()
-        mock_kube_dynamic_client.return_value = mock.MagicMock()
-        mock_kube_dynamic_client.return_value.get.return_value = 
k8s.V1PodList(items=[])
-
-        # This is hack to use overridden conf vars as it seems executors 
loaded before conf override.
-        if hasattr(TaskInstance, "executor"):
-            import importlib
-
-            from airflow.executors import executor_loader
-
-            importlib.reload(executor_loader)
-        create_dummy_dag(dag_id="test_clear_0", task_id="task0", 
with_dagrun_type=None)
-        dag_run = dag_maker.create_dagrun()
-
-        ti0 = dag_run.task_instances[0]
-        ti0.state = State.QUEUED
-        ti0.queued_by_job_id = 1
-        session.flush()
-
-        create_dummy_dag(dag_id="test_clear_1", task_id="task1", 
with_dagrun_type=None)
-        dag_run = dag_maker.create_dagrun()
-
-        ti1 = dag_run.task_instances[0]
-        ti1.state = State.QUEUED
-        ti1.queued_by_job_id = 2
-        session.flush()
-
-        executor = self.kubernetes_executor
-        executor.job_id = 1
-        executor.kube_client = mock_kube_client
-        executor.clear_not_launched_queued_tasks(session=session)
-
-        ti0.refresh_from_db()
-        ti1.refresh_from_db()
-        assert ti0.state == State.SCHEDULED
-        assert ti1.state == State.QUEUED
-
     @pytest.mark.db_test
     
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
     def test_get_task_log(self, mock_get_kube_client, 
create_task_instance_of_operator):
@@ -2125,7 +1681,7 @@ class TestKubernetesJobWatcher:
         self.pod.metadata.deletion_timestamp = timezone.utcnow()
 
         self._run()
-        self.assert_watcher_queue_called_once_with_state(State.FAILED)
+        self.watcher.watcher_queue.put.assert_not_called()
 
     def test_process_status_failed(self):
         self.pod.status.phase = "Failed"

Reply via email to