This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7630158ecebf18c110d514257d2e98543692a1ee
Author: Jens Scheffler <[email protected]>
AuthorDate: Fri Oct 31 20:16:44 2025 +0100

    [v3-1-test] Remove leftovers from unlimited parallelism in local executor 
(#57579) (#57644)
    
    * Remove leftovers from unlimited execution in LocalExecutor
    
    * Remove leftovers from unlimited execution in LocalExecutor
    
    * Remove orphan unit test
    (cherry picked from commit 663d2c686580dfe402d4c2fe46781dccab1b5889)
---
 airflow-core/docs/core-concepts/executor/local.rst | 39 ++++++++++------------
 .../src/airflow/executors/local_executor.py        | 11 ++----
 .../tests/unit/executors/test_local_executor.py    | 15 +++------
 .../executors/test_local_executor_check_workers.py | 10 ------
 4 files changed, 25 insertions(+), 50 deletions(-)

diff --git a/airflow-core/docs/core-concepts/executor/local.rst 
b/airflow-core/docs/core-concepts/executor/local.rst
index 65dd1ba471d..a533ae3a5b8 100644
--- a/airflow-core/docs/core-concepts/executor/local.rst
+++ b/airflow-core/docs/core-concepts/executor/local.rst
@@ -21,32 +21,29 @@
 Local Executor
 ==============
 
-:class:`~airflow.executors.local_executor.LocalExecutor` runs tasks by 
spawning processes in a controlled fashion in different modes.
+:class:`~airflow.executors.local_executor.LocalExecutor` runs tasks by 
spawning processes in a controlled fashion on the scheduler node.
 
-Given that BaseExecutor has the option to receive a ``parallelism`` parameter 
to limit the number of process spawned,
-when this parameter is ``0`` the number of processes that LocalExecutor can 
spawn is unlimited.
+The parameter ``parallelism`` limits the number of process spawned not to 
overwhelm the node.
+This parameter must be greater than ``0``.
 
-The following strategies are implemented:
-
-- | **Unlimited Parallelism** (``self.parallelism == 0``): In this strategy, 
LocalExecutor will
-  | spawn a process every time ``execute_async`` is called, that is, every 
task submitted to the
-  | :class:`~airflow.executors.local_executor.LocalExecutor` will be executed 
in its own process. Once the task is executed and the
-  | result stored in the ``result_queue``, the process terminates. There is no 
need for a
-  | ``task_queue`` in this approach, since as soon as a task is received a new 
process will be
-  | allocated to the task. Processes used in this strategy are of class 
:class:`~airflow.executors.local_executor.LocalWorker`.
-
-- | **Limited Parallelism** (``self.parallelism > 0``): In this strategy, the 
:class:`~airflow.executors.local_executor.LocalExecutor` spawns
-  | the number of processes equal to the value of ``self.parallelism`` at 
``start`` time,
-  | using a ``task_queue`` to coordinate the ingestion of tasks and the work 
distribution among
-  | the workers, which will take a task as soon as they are ready. During the 
lifecycle of
-  | the LocalExecutor, the worker processes are running waiting for tasks, 
once the
-  | LocalExecutor receives the call to shutdown the executor a poison token is 
sent to the
-  | workers to terminate them. Processes used in this strategy are of class 
:class:`~airflow.executors.local_executor.QueuedLocalWorker`.
+The :class:`~airflow.executors.local_executor.LocalExecutor` spawns the number 
of processes equal to the value of ``self.parallelism`` at
+``start`` time, using a ``task_queue`` to coordinate the ingestion of tasks 
and the work distribution among the workers, which will take
+a task as soon as they are ready. During the lifecycle of the LocalExecutor, 
the worker processes are running waiting for tasks, once the
+LocalExecutor receives the call to shutdown the executor a poison token is 
sent to the workers to terminate them. Processes used in this
+strategy are of class 
:class:`~airflow.executors.local_executor.QueuedLocalWorker`.
 
 .. note::
 
-   When multiple Schedulers are configured with ``executor = LocalExecutor`` 
in the ``[core]`` section of your ``airflow.cfg``, each Scheduler will run a 
LocalExecutor. This means tasks would be processed in a distributed fashion 
across the machines running the Schedulers.
+   When multiple Schedulers are configured with ``executor=LocalExecutor`` in 
the ``[core]`` section of your ``airflow.cfg``, each
+   Scheduler will run a LocalExecutor. This means tasks would be processed in 
a distributed fashion across the machines running the
+   Schedulers.
 
    One consideration should be taken into account:
 
-   - Restarting a Scheduler: If a Scheduler is restarted, it may take some 
time for other Schedulers to recognize the orphaned tasks and restart or fail 
them.
+   - Restarting a Scheduler: If a Scheduler is restarted, it may take some 
time for other Schedulers to recognize the orphaned tasks
+     and restart or fail them.
+
+.. note::
+
+   Previous versions of Airflow had the option to configure the LocalExecutor 
with unlimited parallelism
+   (``self.parallelism = 0``). This option has been removed in Airflow 3.0.0 
to avoid overwhelming the scheduler node.
diff --git a/airflow-core/src/airflow/executors/local_executor.py 
b/airflow-core/src/airflow/executors/local_executor.py
index 1c15fca7d9c..1df402213d3 100644
--- a/airflow-core/src/airflow/executors/local_executor.py
+++ b/airflow-core/src/airflow/executors/local_executor.py
@@ -35,7 +35,7 @@ from multiprocessing import Queue, SimpleQueue
 from typing import TYPE_CHECKING
 
 from airflow.executors import workloads
-from airflow.executors.base_executor import PARALLELISM, BaseExecutor
+from airflow.executors.base_executor import BaseExecutor
 from airflow.utils.state import TaskInstanceState
 
 # add logger to parameter of setproctitle to support logging
@@ -138,7 +138,7 @@ class LocalExecutor(BaseExecutor):
 
     It uses the multiprocessing Python library and queues to parallelize the 
execution of tasks.
 
-    :param parallelism: how many parallel processes are run in the executor
+    :param parallelism: how many parallel processes are run in the executor, 
must be > 0
     """
 
     is_local: bool = True
@@ -150,11 +150,6 @@ class LocalExecutor(BaseExecutor):
     workers: dict[int, multiprocessing.Process]
     _unread_messages: multiprocessing.sharedctypes.Synchronized[int]
 
-    def __init__(self, parallelism: int = PARALLELISM):
-        super().__init__(parallelism=parallelism)
-        if self.parallelism < 0:
-            raise ValueError("parallelism must be greater than or equal to 0")
-
     def start(self) -> None:
         """Start the executor."""
         # We delay opening these queues until the start method mostly for unit 
tests. ExecutorLoader caches
@@ -190,7 +185,7 @@ class LocalExecutor(BaseExecutor):
         # If we're using spawn in multiprocessing (default on macOS now) to 
start tasks, this can get called a
         # via `sync()` a few times before the spawned process actually starts 
picking up messages. Try not to
         # create too much
-        if num_outstanding and (self.parallelism == 0 or len(self.workers) < 
self.parallelism):
+        if num_outstanding and len(self.workers) < self.parallelism:
             # This only creates one worker, which is fine as we call this 
directly after putting a message on
             # activity_queue in execute_async
             self._spawn_worker()
diff --git a/airflow-core/tests/unit/executors/test_local_executor.py 
b/airflow-core/tests/unit/executors/test_local_executor.py
index 6489746f220..12aaef6e5de 100644
--- a/airflow-core/tests/unit/executors/test_local_executor.py
+++ b/airflow-core/tests/unit/executors/test_local_executor.py
@@ -55,8 +55,9 @@ class TestLocalExecutor:
     def test_serve_logs_default_value(self):
         assert LocalExecutor.serve_logs
 
+    @skip_spawn_mp_start
     @mock.patch("airflow.sdk.execution_time.supervisor.supervise")
-    def _test_execute(self, mock_supervise, parallelism=1):
+    def test_execution(self, mock_supervise):
         success_tis = [
             workloads.TaskInstance(
                 id=uuid7(),
@@ -84,7 +85,7 @@ class TestLocalExecutor:
 
         mock_supervise.side_effect = fake_supervise
 
-        executor = LocalExecutor(parallelism=parallelism)
+        executor = LocalExecutor(parallelism=2)
         executor.start()
 
         assert executor.result_queue.empty()
@@ -118,7 +119,7 @@ class TestLocalExecutor:
 
             executor.end()
 
-            expected = self.TEST_SUCCESS_COMMANDS + 1 if parallelism == 0 else 
parallelism
+            expected = 2
             # Depending on how quickly the tasks run, we might not need to 
create all the workers we could
             assert 1 <= len(spawn_worker.calls) <= expected
 
@@ -130,14 +131,6 @@ class TestLocalExecutor:
             assert executor.event_buffer[ti.key][0] == State.SUCCESS
         assert executor.event_buffer[fail_ti.key][0] == State.FAILED
 
-    @skip_spawn_mp_start
-    @pytest.mark.parametrize(
-        ("parallelism",),
-        [pytest.param(2, id="limited")],
-    )
-    def test_execution(self, parallelism: int):
-        self._test_execute(parallelism=parallelism)
-
     @mock.patch("airflow.executors.local_executor.LocalExecutor.sync")
     @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
     @mock.patch("airflow.executors.base_executor.Stats.gauge")
diff --git 
a/airflow-core/tests/unit/executors/test_local_executor_check_workers.py 
b/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
index 094ce230596..b0adfe5b9e5 100644
--- a/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
+++ b/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
@@ -117,16 +117,6 @@ def test_no_spawn_if_parallelism_reached(setup_executor):
     executor._spawn_worker.assert_not_called()
 
 
-def test_parallelism_zero_spawns_worker(setup_executor):
-    executor = setup_executor
-    executor.parallelism = 0
-    executor._unread_messages.value = 1
-    executor.activity_queue.empty.return_value = False
-    executor.workers = {}
-    executor._check_workers()
-    executor._spawn_worker.assert_called_once()
-
-
 def test_spawn_worker_when_we_have_parallelism_left(setup_executor):
     executor = setup_executor
     # Simulate 4 running workers

Reply via email to