This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new a94c585ee99 [v3-1-test] Remove leftovers from unlimited parallelism in
local executor (#57579) (#57644)
a94c585ee99 is described below
commit a94c585ee99ada956e91978d87506febfd5ff930
Author: Jens Scheffler <[email protected]>
AuthorDate: Fri Oct 31 20:16:44 2025 +0100
[v3-1-test] Remove leftovers from unlimited parallelism in local executor
(#57579) (#57644)
* Remove leftovers from unlimited execution in LocalExecutor
* Remove leftovers from unlimited execution in LocalExecutor
* Remove orphan unit test
(cherry picked from commit 663d2c686580dfe402d4c2fe46781dccab1b5889)
---
airflow-core/docs/core-concepts/executor/local.rst | 39 ++++++++++------------
.../src/airflow/executors/local_executor.py | 11 ++----
.../tests/unit/executors/test_local_executor.py | 15 +++------
.../executors/test_local_executor_check_workers.py | 10 ------
4 files changed, 25 insertions(+), 50 deletions(-)
diff --git a/airflow-core/docs/core-concepts/executor/local.rst
b/airflow-core/docs/core-concepts/executor/local.rst
index 65dd1ba471d..a533ae3a5b8 100644
--- a/airflow-core/docs/core-concepts/executor/local.rst
+++ b/airflow-core/docs/core-concepts/executor/local.rst
@@ -21,32 +21,29 @@
Local Executor
==============
-:class:`~airflow.executors.local_executor.LocalExecutor` runs tasks by
spawning processes in a controlled fashion in different modes.
+:class:`~airflow.executors.local_executor.LocalExecutor` runs tasks by
spawning processes in a controlled fashion on the scheduler node.
-Given that BaseExecutor has the option to receive a ``parallelism`` parameter
to limit the number of process spawned,
-when this parameter is ``0`` the number of processes that LocalExecutor can
spawn is unlimited.
+The parameter ``parallelism`` limits the number of process spawned not to
overwhelm the node.
+This parameter must be greater than ``0``.
-The following strategies are implemented:
-
-- | **Unlimited Parallelism** (``self.parallelism == 0``): In this strategy,
LocalExecutor will
- | spawn a process every time ``execute_async`` is called, that is, every
task submitted to the
- | :class:`~airflow.executors.local_executor.LocalExecutor` will be executed
in its own process. Once the task is executed and the
- | result stored in the ``result_queue``, the process terminates. There is no
need for a
- | ``task_queue`` in this approach, since as soon as a task is received a new
process will be
- | allocated to the task. Processes used in this strategy are of class
:class:`~airflow.executors.local_executor.LocalWorker`.
-
-- | **Limited Parallelism** (``self.parallelism > 0``): In this strategy, the
:class:`~airflow.executors.local_executor.LocalExecutor` spawns
- | the number of processes equal to the value of ``self.parallelism`` at
``start`` time,
- | using a ``task_queue`` to coordinate the ingestion of tasks and the work
distribution among
- | the workers, which will take a task as soon as they are ready. During the
lifecycle of
- | the LocalExecutor, the worker processes are running waiting for tasks,
once the
- | LocalExecutor receives the call to shutdown the executor a poison token is
sent to the
- | workers to terminate them. Processes used in this strategy are of class
:class:`~airflow.executors.local_executor.QueuedLocalWorker`.
+The :class:`~airflow.executors.local_executor.LocalExecutor` spawns the number
of processes equal to the value of ``self.parallelism`` at
+``start`` time, using a ``task_queue`` to coordinate the ingestion of tasks
and the work distribution among the workers, which will take
+a task as soon as they are ready. During the lifecycle of the LocalExecutor,
the worker processes are running waiting for tasks, once the
+LocalExecutor receives the call to shutdown the executor a poison token is
sent to the workers to terminate them. Processes used in this
+strategy are of class
:class:`~airflow.executors.local_executor.QueuedLocalWorker`.
.. note::
- When multiple Schedulers are configured with ``executor = LocalExecutor``
in the ``[core]`` section of your ``airflow.cfg``, each Scheduler will run a
LocalExecutor. This means tasks would be processed in a distributed fashion
across the machines running the Schedulers.
+ When multiple Schedulers are configured with ``executor=LocalExecutor`` in
the ``[core]`` section of your ``airflow.cfg``, each
+ Scheduler will run a LocalExecutor. This means tasks would be processed in
a distributed fashion across the machines running the
+ Schedulers.
One consideration should be taken into account:
- - Restarting a Scheduler: If a Scheduler is restarted, it may take some
time for other Schedulers to recognize the orphaned tasks and restart or fail
them.
+ - Restarting a Scheduler: If a Scheduler is restarted, it may take some
time for other Schedulers to recognize the orphaned tasks
+ and restart or fail them.
+
+.. note::
+
+ Previous versions of Airflow had the option to configure the LocalExecutor
with unlimited parallelism
+ (``self.parallelism = 0``). This option has been removed in Airflow 3.0.0
to avoid overwhelming the scheduler node.
diff --git a/airflow-core/src/airflow/executors/local_executor.py
b/airflow-core/src/airflow/executors/local_executor.py
index 1c15fca7d9c..1df402213d3 100644
--- a/airflow-core/src/airflow/executors/local_executor.py
+++ b/airflow-core/src/airflow/executors/local_executor.py
@@ -35,7 +35,7 @@ from multiprocessing import Queue, SimpleQueue
from typing import TYPE_CHECKING
from airflow.executors import workloads
-from airflow.executors.base_executor import PARALLELISM, BaseExecutor
+from airflow.executors.base_executor import BaseExecutor
from airflow.utils.state import TaskInstanceState
# add logger to parameter of setproctitle to support logging
@@ -138,7 +138,7 @@ class LocalExecutor(BaseExecutor):
It uses the multiprocessing Python library and queues to parallelize the
execution of tasks.
- :param parallelism: how many parallel processes are run in the executor
+ :param parallelism: how many parallel processes are run in the executor,
must be > 0
"""
is_local: bool = True
@@ -150,11 +150,6 @@ class LocalExecutor(BaseExecutor):
workers: dict[int, multiprocessing.Process]
_unread_messages: multiprocessing.sharedctypes.Synchronized[int]
- def __init__(self, parallelism: int = PARALLELISM):
- super().__init__(parallelism=parallelism)
- if self.parallelism < 0:
- raise ValueError("parallelism must be greater than or equal to 0")
-
def start(self) -> None:
"""Start the executor."""
# We delay opening these queues until the start method mostly for unit
tests. ExecutorLoader caches
@@ -190,7 +185,7 @@ class LocalExecutor(BaseExecutor):
# If we're using spawn in multiprocessing (default on macOS now) to
start tasks, this can get called a
# via `sync()` a few times before the spawned process actually starts
picking up messages. Try not to
# create too much
- if num_outstanding and (self.parallelism == 0 or len(self.workers) <
self.parallelism):
+ if num_outstanding and len(self.workers) < self.parallelism:
# This only creates one worker, which is fine as we call this
directly after putting a message on
# activity_queue in execute_async
self._spawn_worker()
diff --git a/airflow-core/tests/unit/executors/test_local_executor.py
b/airflow-core/tests/unit/executors/test_local_executor.py
index 6489746f220..12aaef6e5de 100644
--- a/airflow-core/tests/unit/executors/test_local_executor.py
+++ b/airflow-core/tests/unit/executors/test_local_executor.py
@@ -55,8 +55,9 @@ class TestLocalExecutor:
def test_serve_logs_default_value(self):
assert LocalExecutor.serve_logs
+ @skip_spawn_mp_start
@mock.patch("airflow.sdk.execution_time.supervisor.supervise")
- def _test_execute(self, mock_supervise, parallelism=1):
+ def test_execution(self, mock_supervise):
success_tis = [
workloads.TaskInstance(
id=uuid7(),
@@ -84,7 +85,7 @@ class TestLocalExecutor:
mock_supervise.side_effect = fake_supervise
- executor = LocalExecutor(parallelism=parallelism)
+ executor = LocalExecutor(parallelism=2)
executor.start()
assert executor.result_queue.empty()
@@ -118,7 +119,7 @@ class TestLocalExecutor:
executor.end()
- expected = self.TEST_SUCCESS_COMMANDS + 1 if parallelism == 0 else
parallelism
+ expected = 2
# Depending on how quickly the tasks run, we might not need to
create all the workers we could
assert 1 <= len(spawn_worker.calls) <= expected
@@ -130,14 +131,6 @@ class TestLocalExecutor:
assert executor.event_buffer[ti.key][0] == State.SUCCESS
assert executor.event_buffer[fail_ti.key][0] == State.FAILED
- @skip_spawn_mp_start
- @pytest.mark.parametrize(
- ("parallelism",),
- [pytest.param(2, id="limited")],
- )
- def test_execution(self, parallelism: int):
- self._test_execute(parallelism=parallelism)
-
@mock.patch("airflow.executors.local_executor.LocalExecutor.sync")
@mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
@mock.patch("airflow.executors.base_executor.Stats.gauge")
diff --git
a/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
b/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
index 094ce230596..b0adfe5b9e5 100644
--- a/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
+++ b/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
@@ -117,16 +117,6 @@ def test_no_spawn_if_parallelism_reached(setup_executor):
executor._spawn_worker.assert_not_called()
-def test_parallelism_zero_spawns_worker(setup_executor):
- executor = setup_executor
- executor.parallelism = 0
- executor._unread_messages.value = 1
- executor.activity_queue.empty.return_value = False
- executor.workers = {}
- executor._check_workers()
- executor._spawn_worker.assert_called_once()
-
-
def test_spawn_worker_when_we_have_parallelism_left(setup_executor):
executor = setup_executor
# Simulate 4 running workers