This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fc4d148d9a7 Nuking DebugExecutors from the in-tree executors (#48579)
fc4d148d9a7 is described below

commit fc4d148d9a7564e5197a19e57313c8d66b1476ac
Author: Prajwal7842 <[email protected]>
AuthorDate: Tue Apr 1 01:08:54 2025 +0530

    Nuking DebugExecutors from the in-tree executors (#48579)
---
 airflow-core/docs/core-concepts/debug.rst          |  28 ----
 .../src/airflow/config_templates/config.yml        |  11 --
 .../src/airflow/executors/debug_executor.py        | 155 ---------------------
 .../src/airflow/executors/executor_constants.py    |   2 -
 .../src/airflow/executors/executor_loader.py       |   2 -
 .../unit/cli/commands/test_standalone_command.py   |   3 -
 .../tests/unit/executors/test_debug_executor.py    | 147 -------------------
 .../tests/unit/executors/test_executor_loader.py   |   8 +-
 contributing-docs/09_testing.rst                   |   2 +-
 .../contributors_quick_start_vscode.rst            |   4 +-
 10 files changed, 6 insertions(+), 356 deletions(-)

diff --git a/airflow-core/docs/core-concepts/debug.rst 
b/airflow-core/docs/core-concepts/debug.rst
index aa57cd4648f..1e713229f21 100644
--- a/airflow-core/docs/core-concepts/debug.rst
+++ b/airflow-core/docs/core-concepts/debug.rst
@@ -69,15 +69,6 @@ is manually ingested. The cleanup step is also skipped, 
making the intermediate
       run = dag.test(mark_success_pattern="wait_for_.*|cleanup")
       print(f"Intermediate csv: 
{run.get_task_instance('collect_stats').xcom_pull(task_id='collect_stats')}")
 
-Comparison with DebugExecutor
------------------------------
-
-The ``dag.test`` command has the following benefits over the 
:class:`~airflow.executors.debug_executor.DebugExecutor`
-class, which is now deprecated:
-
-1. It does not require running an executor at all. Tasks are run one at a time 
with no executor or scheduler logs.
-2. It is faster than running code with a DebugExecutor as it does not need to 
go through a scheduler loop.
-
 
 Debugging Airflow dags on the command line
 ******************************************
@@ -98,25 +89,6 @@ Run ``python -m pdb <path to dag file>.py`` for an 
interactive debugging experie
   (Pdb) run_this_last
   <Task(EmptyOperator): run_this_last>
 
-.. _executor:DebugExecutor:
-
-Debug Executor (deprecated)
-***************************
-
-The :class:`~airflow.executors.debug_executor.DebugExecutor` is meant as
-a debug tool and can be used from IDE. It is a single process executor that
-queues :class:`~airflow.models.taskinstance.TaskInstance` and executes them by 
running
-``_run_raw_task`` method.
-
-Due to its nature the executor can be used with SQLite database. When used
-with sensors the executor will change sensor mode to ``reschedule`` to avoid
-blocking the execution of DAG.
-
-Additionally ``DebugExecutor`` can be used in a fail-fast mode that will make
-all other running or scheduled tasks fail immediately. To enable this option 
set
-``AIRFLOW__DEBUG__FAIL_FAST=True`` or adjust ``fail_fast`` option in your 
``airflow.cfg``.
-For more information on setting the configuration, see 
:doc:`../../howto/set-config`.
-
 **IDE setup steps:**
 
 1. Add ``main`` block at the end of your DAG file to make it runnable.
diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index b35b1796495..74a4d7eb742 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1290,17 +1290,6 @@ secrets:
       type: integer
       example: ~
       default: "900"
-debug:
-  description: ~
-  options:
-    fail_fast:
-      description: |
-        Used only with ``DebugExecutor``. If set to ``True`` DAG will fail 
with first
-        failed task. Helpful for debugging purposes.
-      version_added: 1.10.8
-      type: string
-      example: ~
-      default: "False"
 api:
   description: ~
   options:
diff --git a/airflow-core/src/airflow/executors/debug_executor.py 
b/airflow-core/src/airflow/executors/debug_executor.py
deleted file mode 100644
index 9d86f5e7270..00000000000
--- a/airflow-core/src/airflow/executors/debug_executor.py
+++ /dev/null
@@ -1,155 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-DebugExecutor.
-
-.. seealso::
-    For more information on how the DebugExecutor works, take a look at the 
guide:
-    :ref:`executor:DebugExecutor`
-"""
-
-from __future__ import annotations
-
-import threading
-import time
-from typing import TYPE_CHECKING, Any
-
-from airflow.executors.base_executor import BaseExecutor
-from airflow.utils.state import TaskInstanceState
-
-if TYPE_CHECKING:
-    from airflow.models.taskinstance import TaskInstance
-    from airflow.models.taskinstancekey import TaskInstanceKey
-
-
-class DebugExecutor(BaseExecutor):
-    """
-    This executor is meant for debugging purposes. It can be used with SQLite.
-
-    It executes one task instance at time. Additionally to support working
-    with sensors, all sensors ``mode`` will be automatically set to 
"reschedule".
-    """
-
-    _terminated = threading.Event()
-
-    is_production: bool = False
-
-    change_sensor_mode_to_reschedule: bool = True
-
-    def __init__(self):
-        super().__init__()
-        self.tasks_to_run: list[TaskInstance] = []
-        # Place where we keep information for task instance raw run
-        self.tasks_params: dict[TaskInstanceKey, dict[str, Any]] = {}
-        from airflow.configuration import conf
-
-        self.fail_fast = conf.getboolean("debug", "fail_fast")
-
-    def execute_async(self, *args, **kwargs) -> None:
-        """Replace the method with a custom trigger_task implementation."""
-
-    def sync(self) -> None:
-        task_succeeded = True
-        while self.tasks_to_run:
-            ti = self.tasks_to_run.pop(0)
-            if self.fail_fast and not task_succeeded:
-                self.log.info("Setting %s to %s", ti.key, 
TaskInstanceState.UPSTREAM_FAILED)
-                ti.set_state(TaskInstanceState.UPSTREAM_FAILED)
-                self.change_state(ti.key, TaskInstanceState.UPSTREAM_FAILED)
-            elif self._terminated.is_set():
-                self.log.info("Executor is terminated! Stopping %s to %s", 
ti.key, TaskInstanceState.FAILED)
-                ti.set_state(TaskInstanceState.FAILED)
-                self.fail(ti.key)
-            else:
-                task_succeeded = self._run_task(ti)
-
-    def _run_task(self, ti: TaskInstance) -> bool:
-        self.log.debug("Executing task: %s", ti)
-        key = ti.key
-        try:
-            params = self.tasks_params.pop(ti.key, {})
-            ti.run(**params)
-            self.success(key)
-            return True
-        except Exception as e:
-            ti.set_state(TaskInstanceState.FAILED)
-            self.fail(key)
-            self.log.exception("Failed to execute task: %s.", e)
-            return False
-
-    def queue_task_instance(
-        self,
-        task_instance: TaskInstance,
-        mark_success: bool = False,
-        ignore_all_deps: bool = False,
-        ignore_depends_on_past: bool = False,
-        wait_for_past_depends_before_skipping: bool = False,
-        ignore_task_deps: bool = False,
-        ignore_ti_state: bool = False,
-        pool: str | None = None,
-        cfg_path: str | None = None,
-    ) -> None:
-        """Queues task instance with empty command because we do not need 
it."""
-        if TYPE_CHECKING:
-            assert task_instance.task
-
-        self.queue_command(
-            task_instance,
-            [str(task_instance)],  # Just for better logging, it's not used 
anywhere
-            priority=task_instance.priority_weight,
-            queue=task_instance.task.queue,
-        )
-        # Save params for TaskInstance._run_raw_task
-        self.tasks_params[task_instance.key] = {
-            "mark_success": mark_success,
-            "pool": pool,
-        }
-
-    def trigger_tasks(self, open_slots: int) -> None:
-        """
-        Triggers tasks.
-
-        Instead of calling exec_async we just add task instance to 
tasks_to_run queue.
-
-        :param open_slots: Number of open slots
-        """
-        if not self.queued_tasks:
-            # wait a bit if there are no tasks ready to be executed to avoid 
spinning too fast in the void
-            time.sleep(0.5)
-            return
-
-        sorted_queue = sorted(
-            self.queued_tasks.items(),
-            key=lambda x: x[1][1],
-            reverse=True,
-        )
-        for _ in range(min((open_slots, len(self.queued_tasks)))):
-            key, (_, _, _, ti) = sorted_queue.pop(0)
-            self.queued_tasks.pop(key)
-            self.running.add(key)
-            self.tasks_to_run.append(ti)  # type: ignore
-
-    def end(self) -> None:
-        """Set states of queued tasks to UPSTREAM_FAILED marking them as not 
executed."""
-        for ti in self.tasks_to_run:
-            self.log.info("Setting %s to %s", ti.key, 
TaskInstanceState.UPSTREAM_FAILED)
-            ti.set_state(TaskInstanceState.UPSTREAM_FAILED)
-            self.change_state(ti.key, TaskInstanceState.UPSTREAM_FAILED)
-
-    def terminate(self) -> None:
-        self._terminated.set()
diff --git a/airflow-core/src/airflow/executors/executor_constants.py 
b/airflow-core/src/airflow/executors/executor_constants.py
index 5d752e23233..852db43eea3 100644
--- a/airflow-core/src/airflow/executors/executor_constants.py
+++ b/airflow-core/src/airflow/executors/executor_constants.py
@@ -31,13 +31,11 @@ LOCAL_EXECUTOR = "LocalExecutor"
 SEQUENTIAL_EXECUTOR = "SequentialExecutor"
 CELERY_EXECUTOR = "CeleryExecutor"
 KUBERNETES_EXECUTOR = "KubernetesExecutor"
-DEBUG_EXECUTOR = "DebugExecutor"
 MOCK_EXECUTOR = "MockExecutor"
 CORE_EXECUTOR_NAMES = {
     LOCAL_EXECUTOR,
     SEQUENTIAL_EXECUTOR,
     CELERY_EXECUTOR,
     KUBERNETES_EXECUTOR,
-    DEBUG_EXECUTOR,
     MOCK_EXECUTOR,
 }
diff --git a/airflow-core/src/airflow/executors/executor_loader.py 
b/airflow-core/src/airflow/executors/executor_loader.py
index 80fed5c7278..6624d14e23f 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -26,7 +26,6 @@ from airflow.exceptions import AirflowConfigException, 
UnknownExecutorException
 from airflow.executors.executor_constants import (
     CELERY_EXECUTOR,
     CORE_EXECUTOR_NAMES,
-    DEBUG_EXECUTOR,
     KUBERNETES_EXECUTOR,
     LOCAL_EXECUTOR,
     SEQUENTIAL_EXECUTOR,
@@ -61,7 +60,6 @@ class ExecutorLoader:
         CELERY_EXECUTOR: 
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
         KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
         "executors.kubernetes_executor.KubernetesExecutor",
-        DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor",
     }
 
     @classmethod
diff --git a/airflow-core/tests/unit/cli/commands/test_standalone_command.py 
b/airflow-core/tests/unit/cli/commands/test_standalone_command.py
index 6151493ee6e..e90da7e369b 100644
--- a/airflow-core/tests/unit/cli/commands/test_standalone_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_standalone_command.py
@@ -26,7 +26,6 @@ from airflow.cli.commands.standalone_command import 
StandaloneCommand
 from airflow.executors import executor_loader
 from airflow.executors.executor_constants import (
     CELERY_EXECUTOR,
-    DEBUG_EXECUTOR,
     KUBERNETES_EXECUTOR,
     LOCAL_EXECUTOR,
     SEQUENTIAL_EXECUTOR,
@@ -41,12 +40,10 @@ class TestStandaloneCommand:
             (SEQUENTIAL_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
             (CELERY_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
             (KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
-            (DEBUG_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
             (LOCAL_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
             (SEQUENTIAL_EXECUTOR, "other_db_conn_string", SEQUENTIAL_EXECUTOR),
             (CELERY_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
             (KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
-            (DEBUG_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
         ],
     )
     def test_calculate_env(self, conf_executor_name, conf_sql_alchemy_conn, 
expected_standalone_executor):
diff --git a/airflow-core/tests/unit/executors/test_debug_executor.py 
b/airflow-core/tests/unit/executors/test_debug_executor.py
deleted file mode 100644
index fdafa68ad5a..00000000000
--- a/airflow-core/tests/unit/executors/test_debug_executor.py
+++ /dev/null
@@ -1,147 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from unittest import mock
-from unittest.mock import MagicMock
-
-from airflow.executors.debug_executor import DebugExecutor
-from airflow.utils.state import State
-
-
-class TestDebugExecutor:
-    @mock.patch("airflow.executors.debug_executor.DebugExecutor._run_task")
-    def test_sync(self, run_task_mock):
-        run_task_mock.return_value = True
-
-        executor = DebugExecutor()
-
-        ti1 = MagicMock(key="t1")
-        ti2 = MagicMock(key="t2")
-        executor.tasks_to_run = [ti1, ti2]
-
-        executor.sync()
-        assert not executor.tasks_to_run
-        run_task_mock.assert_has_calls([mock.call(ti1), mock.call(ti2)])
-
-    @mock.patch("airflow.models.taskinstance.TaskInstance")
-    def test_run_task(self, task_instance_mock):
-        ti_key = "key"
-        job_id = " job_id"
-        task_instance_mock.key = ti_key
-        task_instance_mock.job_id = job_id
-
-        executor = DebugExecutor()
-        executor.running = {ti_key}
-        succeeded = executor._run_task(task_instance_mock)
-
-        assert succeeded
-        task_instance_mock.run.assert_called()
-
-    def test_queue_task_instance(self):
-        key = "ti_key"
-        ti = MagicMock(key=key)
-
-        executor = DebugExecutor()
-        executor.queue_task_instance(task_instance=ti, mark_success=True, 
pool="pool")
-
-        assert key in executor.queued_tasks
-        assert key in executor.tasks_params
-        assert executor.tasks_params[key] == {
-            "mark_success": True,
-            "pool": "pool",
-        }
-
-    def test_trigger_tasks(self):
-        execute_mock = MagicMock()
-        executor = DebugExecutor()
-        executor.execute_async = execute_mock
-
-        executor.queued_tasks = {
-            "t1": (None, 1, None, MagicMock(key="t1")),
-            "t2": (None, 2, None, MagicMock(key="t2")),
-        }
-
-        executor.trigger_tasks(open_slots=4)
-        assert not executor.queued_tasks
-        assert len(executor.running) == 2
-        assert len(executor.tasks_to_run) == 2
-        assert not execute_mock.called
-
-    def test_end(self):
-        ti = MagicMock(key="ti_key")
-
-        executor = DebugExecutor()
-        executor.tasks_to_run = [ti]
-        executor.running = {ti.key}
-        executor.end()
-
-        ti.set_state.assert_called_once_with(State.UPSTREAM_FAILED)
-        assert not executor.running
-
-    @mock.patch("airflow.executors.debug_executor.DebugExecutor.change_state")
-    def test_fail_fast(self, change_state_mock):
-        with mock.patch.dict("os.environ", {"AIRFLOW__DEBUG__FAIL_FAST": 
"True"}):
-            executor = DebugExecutor()
-
-        ti1 = MagicMock(key="t1")
-        ti2 = MagicMock(key="t2")
-
-        ti1.run.side_effect = Exception
-
-        executor.tasks_to_run = [ti1, ti2]
-
-        executor.sync()
-
-        assert executor.fail_fast
-        assert not executor.tasks_to_run
-        change_state_mock.assert_has_calls(
-            [
-                mock.call(ti1.key, State.FAILED, None),
-                mock.call(ti2.key, State.UPSTREAM_FAILED),
-            ]
-        )
-
-    def test_reschedule_mode(self):
-        assert DebugExecutor.change_sensor_mode_to_reschedule
-
-    def test_is_production_default_value(self):
-        assert not DebugExecutor.is_production
-
-    @mock.patch("time.sleep", autospec=True)
-    def test_trigger_sleep_when_no_task(self, mock_sleep):
-        execute_mock = MagicMock()
-        executor = DebugExecutor()
-        executor.execute_async = execute_mock
-        executor.queued_tasks = {}
-        executor.trigger_tasks(open_slots=5)
-        mock_sleep.assert_called()
-
-    @mock.patch("airflow.executors.debug_executor.DebugExecutor.change_state")
-    def test_sync_after_terminate(self, change_state_mock):
-        executor = DebugExecutor()
-
-        ti1 = MagicMock(key="t1")
-        executor.tasks_to_run = [ti1]
-        executor.terminate()
-        executor.sync()
-
-        change_state_mock.assert_has_calls(
-            [
-                mock.call(ti1.key, State.FAILED, None),
-            ]
-        )
diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py 
b/airflow-core/tests/unit/executors/test_executor_loader.py
index 57428440c8d..1ef270d11e9 100644
--- a/airflow-core/tests/unit/executors/test_executor_loader.py
+++ b/airflow-core/tests/unit/executors/test_executor_loader.py
@@ -45,7 +45,6 @@ class TestExecutorLoader:
         "executor_name",
         [
             "CeleryExecutor",
-            "DebugExecutor",
             "KubernetesExecutor",
             "LocalExecutor",
         ],
@@ -266,8 +265,8 @@ class TestExecutorLoader:
         "executor_config",
         [
             "Celery::Executor, LocalExecutor",
-            "LocalExecutor, Ce:ler:yExecutor, DebugExecutor",
-            "LocalExecutor, CeleryExecutor:, DebugExecutor",
+            "LocalExecutor, Ce:ler:yExecutor",
+            "LocalExecutor, CeleryExecutor:",
             "LocalExecutor, my_cool_alias:",
             "LocalExecutor, my_cool_alias:CeleryExecutor",
             "LocalExecutor, module.path.first:alias_second",
@@ -282,11 +281,10 @@ class TestExecutorLoader:
         ("executor_config", "expected_value"),
         [
             ("CeleryExecutor", "CeleryExecutor"),
-            ("DebugExecutor", "DebugExecutor"),
             ("KubernetesExecutor", "KubernetesExecutor"),
             ("LocalExecutor", "LocalExecutor"),
             ("CeleryExecutor, LocalExecutor", "CeleryExecutor"),
-            ("LocalExecutor, CeleryExecutor, DebugExecutor", "LocalExecutor"),
+            ("LocalExecutor, CeleryExecutor", "LocalExecutor"),
         ],
     )
     def test_should_support_import_executor_from_core(self, executor_config, 
expected_value):
diff --git a/contributing-docs/09_testing.rst b/contributing-docs/09_testing.rst
index 6538b9e185d..06d1d6a880a 100644
--- a/contributing-docs/09_testing.rst
+++ b/contributing-docs/09_testing.rst
@@ -53,7 +53,7 @@ You can also run other kinds of tests when you are developing 
airflow packages:
   client works correctly.
 
 * `DAG testing <testing/dag_testing.rst>`__ is a document that describes how 
to test DAGs in a local environment
-  with ``DebugExecutor``. Note, that this is a legacy method - you can now use 
dag.test() method to test DAGs.
+  with ``dag.test()``.
 
 ------
 
diff --git 
a/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst 
b/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst
index d58942b1d1e..34991d02715 100644
--- a/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst
+++ b/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst
@@ -109,7 +109,7 @@ Setting up debugging
     if __name__ == "__main__":
         dag.test()
 
-- Add ``"AIRFLOW__CORE__EXECUTOR": "DebugExecutor"`` to the ``"env"`` field of 
Debug configuration.
+- Add ``"AIRFLOW__CORE__EXECUTOR": "LocalExecutor"`` to the ``"env"`` field of 
Debug configuration.
 
   - Using the ``Run`` view click on ``Create a launch.json file``
 
@@ -133,7 +133,7 @@ Setting up debugging
              "program": 
"${workspaceFolder}/files/dags/example_bash_operator.py",
              "env": {
                  "PYTHONUNBUFFERED": "1",
-                 "AIRFLOW__CORE__EXECUTOR": "DebugExecutor"
+                 "AIRFLOW__CORE__EXECUTOR": "LocalExecutor"
               },
               "python": "${env:HOME}/.pyenv/versions/airflow/bin/python"
          ]

Reply via email to