This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fc4d148d9a7 Nuking DebugExecutors from the in-tree executors (#48579)
fc4d148d9a7 is described below
commit fc4d148d9a7564e5197a19e57313c8d66b1476ac
Author: Prajwal7842 <[email protected]>
AuthorDate: Tue Apr 1 01:08:54 2025 +0530
Nuking DebugExecutors from the in-tree executors (#48579)
---
airflow-core/docs/core-concepts/debug.rst | 28 ----
.../src/airflow/config_templates/config.yml | 11 --
.../src/airflow/executors/debug_executor.py | 155 ---------------------
.../src/airflow/executors/executor_constants.py | 2 -
.../src/airflow/executors/executor_loader.py | 2 -
.../unit/cli/commands/test_standalone_command.py | 3 -
.../tests/unit/executors/test_debug_executor.py | 147 -------------------
.../tests/unit/executors/test_executor_loader.py | 8 +-
contributing-docs/09_testing.rst | 2 +-
.../contributors_quick_start_vscode.rst | 4 +-
10 files changed, 6 insertions(+), 356 deletions(-)
diff --git a/airflow-core/docs/core-concepts/debug.rst
b/airflow-core/docs/core-concepts/debug.rst
index aa57cd4648f..1e713229f21 100644
--- a/airflow-core/docs/core-concepts/debug.rst
+++ b/airflow-core/docs/core-concepts/debug.rst
@@ -69,15 +69,6 @@ is manually ingested. The cleanup step is also skipped,
making the intermediate
run = dag.test(mark_success_pattern="wait_for_.*|cleanup")
print(f"Intermediate csv:
{run.get_task_instance('collect_stats').xcom_pull(task_id='collect_stats')}")
-Comparison with DebugExecutor
------------------------------
-
-The ``dag.test`` command has the following benefits over the
:class:`~airflow.executors.debug_executor.DebugExecutor`
-class, which is now deprecated:
-
-1. It does not require running an executor at all. Tasks are run one at a time
with no executor or scheduler logs.
-2. It is faster than running code with a DebugExecutor as it does not need to
go through a scheduler loop.
-
Debugging Airflow dags on the command line
******************************************
@@ -98,25 +89,6 @@ Run ``python -m pdb <path to dag file>.py`` for an
interactive debugging experie
(Pdb) run_this_last
<Task(EmptyOperator): run_this_last>
-.. _executor:DebugExecutor:
-
-Debug Executor (deprecated)
-***************************
-
-The :class:`~airflow.executors.debug_executor.DebugExecutor` is meant as
-a debug tool and can be used from IDE. It is a single process executor that
-queues :class:`~airflow.models.taskinstance.TaskInstance` and executes them by
running
-``_run_raw_task`` method.
-
-Due to its nature the executor can be used with SQLite database. When used
-with sensors the executor will change sensor mode to ``reschedule`` to avoid
-blocking the execution of DAG.
-
-Additionally ``DebugExecutor`` can be used in a fail-fast mode that will make
-all other running or scheduled tasks fail immediately. To enable this option
set
-``AIRFLOW__DEBUG__FAIL_FAST=True`` or adjust ``fail_fast`` option in your
``airflow.cfg``.
-For more information on setting the configuration, see
:doc:`../../howto/set-config`.
-
**IDE setup steps:**
1. Add ``main`` block at the end of your DAG file to make it runnable.
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index b35b1796495..74a4d7eb742 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1290,17 +1290,6 @@ secrets:
type: integer
example: ~
default: "900"
-debug:
- description: ~
- options:
- fail_fast:
- description: |
- Used only with ``DebugExecutor``. If set to ``True`` DAG will fail
with first
- failed task. Helpful for debugging purposes.
- version_added: 1.10.8
- type: string
- example: ~
- default: "False"
api:
description: ~
options:
diff --git a/airflow-core/src/airflow/executors/debug_executor.py
b/airflow-core/src/airflow/executors/debug_executor.py
deleted file mode 100644
index 9d86f5e7270..00000000000
--- a/airflow-core/src/airflow/executors/debug_executor.py
+++ /dev/null
@@ -1,155 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-DebugExecutor.
-
-.. seealso::
- For more information on how the DebugExecutor works, take a look at the
guide:
- :ref:`executor:DebugExecutor`
-"""
-
-from __future__ import annotations
-
-import threading
-import time
-from typing import TYPE_CHECKING, Any
-
-from airflow.executors.base_executor import BaseExecutor
-from airflow.utils.state import TaskInstanceState
-
-if TYPE_CHECKING:
- from airflow.models.taskinstance import TaskInstance
- from airflow.models.taskinstancekey import TaskInstanceKey
-
-
-class DebugExecutor(BaseExecutor):
- """
- This executor is meant for debugging purposes. It can be used with SQLite.
-
- It executes one task instance at time. Additionally to support working
- with sensors, all sensors ``mode`` will be automatically set to
"reschedule".
- """
-
- _terminated = threading.Event()
-
- is_production: bool = False
-
- change_sensor_mode_to_reschedule: bool = True
-
- def __init__(self):
- super().__init__()
- self.tasks_to_run: list[TaskInstance] = []
- # Place where we keep information for task instance raw run
- self.tasks_params: dict[TaskInstanceKey, dict[str, Any]] = {}
- from airflow.configuration import conf
-
- self.fail_fast = conf.getboolean("debug", "fail_fast")
-
- def execute_async(self, *args, **kwargs) -> None:
- """Replace the method with a custom trigger_task implementation."""
-
- def sync(self) -> None:
- task_succeeded = True
- while self.tasks_to_run:
- ti = self.tasks_to_run.pop(0)
- if self.fail_fast and not task_succeeded:
- self.log.info("Setting %s to %s", ti.key,
TaskInstanceState.UPSTREAM_FAILED)
- ti.set_state(TaskInstanceState.UPSTREAM_FAILED)
- self.change_state(ti.key, TaskInstanceState.UPSTREAM_FAILED)
- elif self._terminated.is_set():
- self.log.info("Executor is terminated! Stopping %s to %s",
ti.key, TaskInstanceState.FAILED)
- ti.set_state(TaskInstanceState.FAILED)
- self.fail(ti.key)
- else:
- task_succeeded = self._run_task(ti)
-
- def _run_task(self, ti: TaskInstance) -> bool:
- self.log.debug("Executing task: %s", ti)
- key = ti.key
- try:
- params = self.tasks_params.pop(ti.key, {})
- ti.run(**params)
- self.success(key)
- return True
- except Exception as e:
- ti.set_state(TaskInstanceState.FAILED)
- self.fail(key)
- self.log.exception("Failed to execute task: %s.", e)
- return False
-
- def queue_task_instance(
- self,
- task_instance: TaskInstance,
- mark_success: bool = False,
- ignore_all_deps: bool = False,
- ignore_depends_on_past: bool = False,
- wait_for_past_depends_before_skipping: bool = False,
- ignore_task_deps: bool = False,
- ignore_ti_state: bool = False,
- pool: str | None = None,
- cfg_path: str | None = None,
- ) -> None:
- """Queues task instance with empty command because we do not need
it."""
- if TYPE_CHECKING:
- assert task_instance.task
-
- self.queue_command(
- task_instance,
- [str(task_instance)], # Just for better logging, it's not used
anywhere
- priority=task_instance.priority_weight,
- queue=task_instance.task.queue,
- )
- # Save params for TaskInstance._run_raw_task
- self.tasks_params[task_instance.key] = {
- "mark_success": mark_success,
- "pool": pool,
- }
-
- def trigger_tasks(self, open_slots: int) -> None:
- """
- Triggers tasks.
-
- Instead of calling exec_async we just add task instance to
tasks_to_run queue.
-
- :param open_slots: Number of open slots
- """
- if not self.queued_tasks:
- # wait a bit if there are no tasks ready to be executed to avoid
spinning too fast in the void
- time.sleep(0.5)
- return
-
- sorted_queue = sorted(
- self.queued_tasks.items(),
- key=lambda x: x[1][1],
- reverse=True,
- )
- for _ in range(min((open_slots, len(self.queued_tasks)))):
- key, (_, _, _, ti) = sorted_queue.pop(0)
- self.queued_tasks.pop(key)
- self.running.add(key)
- self.tasks_to_run.append(ti) # type: ignore
-
- def end(self) -> None:
- """Set states of queued tasks to UPSTREAM_FAILED marking them as not
executed."""
- for ti in self.tasks_to_run:
- self.log.info("Setting %s to %s", ti.key,
TaskInstanceState.UPSTREAM_FAILED)
- ti.set_state(TaskInstanceState.UPSTREAM_FAILED)
- self.change_state(ti.key, TaskInstanceState.UPSTREAM_FAILED)
-
- def terminate(self) -> None:
- self._terminated.set()
diff --git a/airflow-core/src/airflow/executors/executor_constants.py
b/airflow-core/src/airflow/executors/executor_constants.py
index 5d752e23233..852db43eea3 100644
--- a/airflow-core/src/airflow/executors/executor_constants.py
+++ b/airflow-core/src/airflow/executors/executor_constants.py
@@ -31,13 +31,11 @@ LOCAL_EXECUTOR = "LocalExecutor"
SEQUENTIAL_EXECUTOR = "SequentialExecutor"
CELERY_EXECUTOR = "CeleryExecutor"
KUBERNETES_EXECUTOR = "KubernetesExecutor"
-DEBUG_EXECUTOR = "DebugExecutor"
MOCK_EXECUTOR = "MockExecutor"
CORE_EXECUTOR_NAMES = {
LOCAL_EXECUTOR,
SEQUENTIAL_EXECUTOR,
CELERY_EXECUTOR,
KUBERNETES_EXECUTOR,
- DEBUG_EXECUTOR,
MOCK_EXECUTOR,
}
diff --git a/airflow-core/src/airflow/executors/executor_loader.py
b/airflow-core/src/airflow/executors/executor_loader.py
index 80fed5c7278..6624d14e23f 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -26,7 +26,6 @@ from airflow.exceptions import AirflowConfigException,
UnknownExecutorException
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CORE_EXECUTOR_NAMES,
- DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
SEQUENTIAL_EXECUTOR,
@@ -61,7 +60,6 @@ class ExecutorLoader:
CELERY_EXECUTOR:
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
"executors.kubernetes_executor.KubernetesExecutor",
- DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor",
}
@classmethod
diff --git a/airflow-core/tests/unit/cli/commands/test_standalone_command.py
b/airflow-core/tests/unit/cli/commands/test_standalone_command.py
index 6151493ee6e..e90da7e369b 100644
--- a/airflow-core/tests/unit/cli/commands/test_standalone_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_standalone_command.py
@@ -26,7 +26,6 @@ from airflow.cli.commands.standalone_command import
StandaloneCommand
from airflow.executors import executor_loader
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
- DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
SEQUENTIAL_EXECUTOR,
@@ -41,12 +40,10 @@ class TestStandaloneCommand:
(SEQUENTIAL_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(CELERY_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
- (DEBUG_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(LOCAL_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
(SEQUENTIAL_EXECUTOR, "other_db_conn_string", SEQUENTIAL_EXECUTOR),
(CELERY_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
(KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
- (DEBUG_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
],
)
def test_calculate_env(self, conf_executor_name, conf_sql_alchemy_conn,
expected_standalone_executor):
diff --git a/airflow-core/tests/unit/executors/test_debug_executor.py
b/airflow-core/tests/unit/executors/test_debug_executor.py
deleted file mode 100644
index fdafa68ad5a..00000000000
--- a/airflow-core/tests/unit/executors/test_debug_executor.py
+++ /dev/null
@@ -1,147 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from unittest import mock
-from unittest.mock import MagicMock
-
-from airflow.executors.debug_executor import DebugExecutor
-from airflow.utils.state import State
-
-
-class TestDebugExecutor:
- @mock.patch("airflow.executors.debug_executor.DebugExecutor._run_task")
- def test_sync(self, run_task_mock):
- run_task_mock.return_value = True
-
- executor = DebugExecutor()
-
- ti1 = MagicMock(key="t1")
- ti2 = MagicMock(key="t2")
- executor.tasks_to_run = [ti1, ti2]
-
- executor.sync()
- assert not executor.tasks_to_run
- run_task_mock.assert_has_calls([mock.call(ti1), mock.call(ti2)])
-
- @mock.patch("airflow.models.taskinstance.TaskInstance")
- def test_run_task(self, task_instance_mock):
- ti_key = "key"
- job_id = " job_id"
- task_instance_mock.key = ti_key
- task_instance_mock.job_id = job_id
-
- executor = DebugExecutor()
- executor.running = {ti_key}
- succeeded = executor._run_task(task_instance_mock)
-
- assert succeeded
- task_instance_mock.run.assert_called()
-
- def test_queue_task_instance(self):
- key = "ti_key"
- ti = MagicMock(key=key)
-
- executor = DebugExecutor()
- executor.queue_task_instance(task_instance=ti, mark_success=True,
pool="pool")
-
- assert key in executor.queued_tasks
- assert key in executor.tasks_params
- assert executor.tasks_params[key] == {
- "mark_success": True,
- "pool": "pool",
- }
-
- def test_trigger_tasks(self):
- execute_mock = MagicMock()
- executor = DebugExecutor()
- executor.execute_async = execute_mock
-
- executor.queued_tasks = {
- "t1": (None, 1, None, MagicMock(key="t1")),
- "t2": (None, 2, None, MagicMock(key="t2")),
- }
-
- executor.trigger_tasks(open_slots=4)
- assert not executor.queued_tasks
- assert len(executor.running) == 2
- assert len(executor.tasks_to_run) == 2
- assert not execute_mock.called
-
- def test_end(self):
- ti = MagicMock(key="ti_key")
-
- executor = DebugExecutor()
- executor.tasks_to_run = [ti]
- executor.running = {ti.key}
- executor.end()
-
- ti.set_state.assert_called_once_with(State.UPSTREAM_FAILED)
- assert not executor.running
-
- @mock.patch("airflow.executors.debug_executor.DebugExecutor.change_state")
- def test_fail_fast(self, change_state_mock):
- with mock.patch.dict("os.environ", {"AIRFLOW__DEBUG__FAIL_FAST":
"True"}):
- executor = DebugExecutor()
-
- ti1 = MagicMock(key="t1")
- ti2 = MagicMock(key="t2")
-
- ti1.run.side_effect = Exception
-
- executor.tasks_to_run = [ti1, ti2]
-
- executor.sync()
-
- assert executor.fail_fast
- assert not executor.tasks_to_run
- change_state_mock.assert_has_calls(
- [
- mock.call(ti1.key, State.FAILED, None),
- mock.call(ti2.key, State.UPSTREAM_FAILED),
- ]
- )
-
- def test_reschedule_mode(self):
- assert DebugExecutor.change_sensor_mode_to_reschedule
-
- def test_is_production_default_value(self):
- assert not DebugExecutor.is_production
-
- @mock.patch("time.sleep", autospec=True)
- def test_trigger_sleep_when_no_task(self, mock_sleep):
- execute_mock = MagicMock()
- executor = DebugExecutor()
- executor.execute_async = execute_mock
- executor.queued_tasks = {}
- executor.trigger_tasks(open_slots=5)
- mock_sleep.assert_called()
-
- @mock.patch("airflow.executors.debug_executor.DebugExecutor.change_state")
- def test_sync_after_terminate(self, change_state_mock):
- executor = DebugExecutor()
-
- ti1 = MagicMock(key="t1")
- executor.tasks_to_run = [ti1]
- executor.terminate()
- executor.sync()
-
- change_state_mock.assert_has_calls(
- [
- mock.call(ti1.key, State.FAILED, None),
- ]
- )
diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py
b/airflow-core/tests/unit/executors/test_executor_loader.py
index 57428440c8d..1ef270d11e9 100644
--- a/airflow-core/tests/unit/executors/test_executor_loader.py
+++ b/airflow-core/tests/unit/executors/test_executor_loader.py
@@ -45,7 +45,6 @@ class TestExecutorLoader:
"executor_name",
[
"CeleryExecutor",
- "DebugExecutor",
"KubernetesExecutor",
"LocalExecutor",
],
@@ -266,8 +265,8 @@ class TestExecutorLoader:
"executor_config",
[
"Celery::Executor, LocalExecutor",
- "LocalExecutor, Ce:ler:yExecutor, DebugExecutor",
- "LocalExecutor, CeleryExecutor:, DebugExecutor",
+ "LocalExecutor, Ce:ler:yExecutor",
+ "LocalExecutor, CeleryExecutor:",
"LocalExecutor, my_cool_alias:",
"LocalExecutor, my_cool_alias:CeleryExecutor",
"LocalExecutor, module.path.first:alias_second",
@@ -282,11 +281,10 @@ class TestExecutorLoader:
("executor_config", "expected_value"),
[
("CeleryExecutor", "CeleryExecutor"),
- ("DebugExecutor", "DebugExecutor"),
("KubernetesExecutor", "KubernetesExecutor"),
("LocalExecutor", "LocalExecutor"),
("CeleryExecutor, LocalExecutor", "CeleryExecutor"),
- ("LocalExecutor, CeleryExecutor, DebugExecutor", "LocalExecutor"),
+ ("LocalExecutor, CeleryExecutor", "LocalExecutor"),
],
)
def test_should_support_import_executor_from_core(self, executor_config,
expected_value):
diff --git a/contributing-docs/09_testing.rst b/contributing-docs/09_testing.rst
index 6538b9e185d..06d1d6a880a 100644
--- a/contributing-docs/09_testing.rst
+++ b/contributing-docs/09_testing.rst
@@ -53,7 +53,7 @@ You can also run other kinds of tests when you are developing
airflow packages:
client works correctly.
* `DAG testing <testing/dag_testing.rst>`__ is a document that describes how
to test DAGs in a local environment
- with ``DebugExecutor``. Note, that this is a legacy method - you can now use
dag.test() method to test DAGs.
+ with ``dag.test()``.
------
diff --git
a/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst
b/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst
index d58942b1d1e..34991d02715 100644
--- a/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst
+++ b/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst
@@ -109,7 +109,7 @@ Setting up debugging
if __name__ == "__main__":
dag.test()
-- Add ``"AIRFLOW__CORE__EXECUTOR": "DebugExecutor"`` to the ``"env"`` field of
Debug configuration.
+- Add ``"AIRFLOW__CORE__EXECUTOR": "LocalExecutor"`` to the ``"env"`` field of
Debug configuration.
- Using the ``Run`` view click on ``Create a launch.json file``
@@ -133,7 +133,7 @@ Setting up debugging
"program":
"${workspaceFolder}/files/dags/example_bash_operator.py",
"env": {
"PYTHONUNBUFFERED": "1",
- "AIRFLOW__CORE__EXECUTOR": "DebugExecutor"
+ "AIRFLOW__CORE__EXECUTOR": "LocalExecutor"
},
"python": "${env:HOME}/.pyenv/versions/airflow/bin/python"
]