This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7630158ecebf18c110d514257d2e98543692a1ee Author: Jens Scheffler <[email protected]> AuthorDate: Fri Oct 31 20:16:44 2025 +0100 [v3-1-test] Remove leftovers from unlimited parallelism in local executor (#57579) (#57644) * Remove leftovers from unlimited execution in LocalExecutor * Remove leftovers from unlimited execution in LocalExecutor * Remove orphan unit test (cherry picked from commit 663d2c686580dfe402d4c2fe46781dccab1b5889) --- airflow-core/docs/core-concepts/executor/local.rst | 39 ++++++++++------------ .../src/airflow/executors/local_executor.py | 11 ++---- .../tests/unit/executors/test_local_executor.py | 15 +++------ .../executors/test_local_executor_check_workers.py | 10 ------ 4 files changed, 25 insertions(+), 50 deletions(-) diff --git a/airflow-core/docs/core-concepts/executor/local.rst b/airflow-core/docs/core-concepts/executor/local.rst index 65dd1ba471d..a533ae3a5b8 100644 --- a/airflow-core/docs/core-concepts/executor/local.rst +++ b/airflow-core/docs/core-concepts/executor/local.rst @@ -21,32 +21,29 @@ Local Executor ============== -:class:`~airflow.executors.local_executor.LocalExecutor` runs tasks by spawning processes in a controlled fashion in different modes. +:class:`~airflow.executors.local_executor.LocalExecutor` runs tasks by spawning processes in a controlled fashion on the scheduler node. -Given that BaseExecutor has the option to receive a ``parallelism`` parameter to limit the number of process spawned, -when this parameter is ``0`` the number of processes that LocalExecutor can spawn is unlimited. +The parameter ``parallelism`` limits the number of process spawned not to overwhelm the node. +This parameter must be greater than ``0``. -The following strategies are implemented: - -- | **Unlimited Parallelism** (``self.parallelism == 0``): In this strategy, LocalExecutor will - | spawn a process every time ``execute_async`` is called, that is, every task submitted to the - | :class:`~airflow.executors.local_executor.LocalExecutor` will be executed in its own process. Once the task is executed and the - | result stored in the ``result_queue``, the process terminates. There is no need for a - | ``task_queue`` in this approach, since as soon as a task is received a new process will be - | allocated to the task. Processes used in this strategy are of class :class:`~airflow.executors.local_executor.LocalWorker`. - -- | **Limited Parallelism** (``self.parallelism > 0``): In this strategy, the :class:`~airflow.executors.local_executor.LocalExecutor` spawns - | the number of processes equal to the value of ``self.parallelism`` at ``start`` time, - | using a ``task_queue`` to coordinate the ingestion of tasks and the work distribution among - | the workers, which will take a task as soon as they are ready. During the lifecycle of - | the LocalExecutor, the worker processes are running waiting for tasks, once the - | LocalExecutor receives the call to shutdown the executor a poison token is sent to the - | workers to terminate them. Processes used in this strategy are of class :class:`~airflow.executors.local_executor.QueuedLocalWorker`. +The :class:`~airflow.executors.local_executor.LocalExecutor` spawns the number of processes equal to the value of ``self.parallelism`` at +``start`` time, using a ``task_queue`` to coordinate the ingestion of tasks and the work distribution among the workers, which will take +a task as soon as they are ready. During the lifecycle of the LocalExecutor, the worker processes are running waiting for tasks, once the +LocalExecutor receives the call to shutdown the executor a poison token is sent to the workers to terminate them. Processes used in this +strategy are of class :class:`~airflow.executors.local_executor.QueuedLocalWorker`. .. note:: - When multiple Schedulers are configured with ``executor = LocalExecutor`` in the ``[core]`` section of your ``airflow.cfg``, each Scheduler will run a LocalExecutor. This means tasks would be processed in a distributed fashion across the machines running the Schedulers. + When multiple Schedulers are configured with ``executor=LocalExecutor`` in the ``[core]`` section of your ``airflow.cfg``, each + Scheduler will run a LocalExecutor. This means tasks would be processed in a distributed fashion across the machines running the + Schedulers. One consideration should be taken into account: - - Restarting a Scheduler: If a Scheduler is restarted, it may take some time for other Schedulers to recognize the orphaned tasks and restart or fail them. + - Restarting a Scheduler: If a Scheduler is restarted, it may take some time for other Schedulers to recognize the orphaned tasks + and restart or fail them. + +.. note:: + + Previous versions of Airflow had the option to configure the LocalExecutor with unlimited parallelism + (``self.parallelism = 0``). This option has been removed in Airflow 3.0.0 to avoid overwhelming the scheduler node. diff --git a/airflow-core/src/airflow/executors/local_executor.py b/airflow-core/src/airflow/executors/local_executor.py index 1c15fca7d9c..1df402213d3 100644 --- a/airflow-core/src/airflow/executors/local_executor.py +++ b/airflow-core/src/airflow/executors/local_executor.py @@ -35,7 +35,7 @@ from multiprocessing import Queue, SimpleQueue from typing import TYPE_CHECKING from airflow.executors import workloads -from airflow.executors.base_executor import PARALLELISM, BaseExecutor +from airflow.executors.base_executor import BaseExecutor from airflow.utils.state import TaskInstanceState # add logger to parameter of setproctitle to support logging @@ -138,7 +138,7 @@ class LocalExecutor(BaseExecutor): It uses the multiprocessing Python library and queues to parallelize the execution of tasks. - :param parallelism: how many parallel processes are run in the executor + :param parallelism: how many parallel processes are run in the executor, must be > 0 """ is_local: bool = True @@ -150,11 +150,6 @@ class LocalExecutor(BaseExecutor): workers: dict[int, multiprocessing.Process] _unread_messages: multiprocessing.sharedctypes.Synchronized[int] - def __init__(self, parallelism: int = PARALLELISM): - super().__init__(parallelism=parallelism) - if self.parallelism < 0: - raise ValueError("parallelism must be greater than or equal to 0") - def start(self) -> None: """Start the executor.""" # We delay opening these queues until the start method mostly for unit tests. ExecutorLoader caches @@ -190,7 +185,7 @@ class LocalExecutor(BaseExecutor): # If we're using spawn in multiprocessing (default on macOS now) to start tasks, this can get called a # via `sync()` a few times before the spawned process actually starts picking up messages. Try not to # create too much - if num_outstanding and (self.parallelism == 0 or len(self.workers) < self.parallelism): + if num_outstanding and len(self.workers) < self.parallelism: # This only creates one worker, which is fine as we call this directly after putting a message on # activity_queue in execute_async self._spawn_worker() diff --git a/airflow-core/tests/unit/executors/test_local_executor.py b/airflow-core/tests/unit/executors/test_local_executor.py index 6489746f220..12aaef6e5de 100644 --- a/airflow-core/tests/unit/executors/test_local_executor.py +++ b/airflow-core/tests/unit/executors/test_local_executor.py @@ -55,8 +55,9 @@ class TestLocalExecutor: def test_serve_logs_default_value(self): assert LocalExecutor.serve_logs + @skip_spawn_mp_start @mock.patch("airflow.sdk.execution_time.supervisor.supervise") - def _test_execute(self, mock_supervise, parallelism=1): + def test_execution(self, mock_supervise): success_tis = [ workloads.TaskInstance( id=uuid7(), @@ -84,7 +85,7 @@ class TestLocalExecutor: mock_supervise.side_effect = fake_supervise - executor = LocalExecutor(parallelism=parallelism) + executor = LocalExecutor(parallelism=2) executor.start() assert executor.result_queue.empty() @@ -118,7 +119,7 @@ class TestLocalExecutor: executor.end() - expected = self.TEST_SUCCESS_COMMANDS + 1 if parallelism == 0 else parallelism + expected = 2 # Depending on how quickly the tasks run, we might not need to create all the workers we could assert 1 <= len(spawn_worker.calls) <= expected @@ -130,14 +131,6 @@ class TestLocalExecutor: assert executor.event_buffer[ti.key][0] == State.SUCCESS assert executor.event_buffer[fail_ti.key][0] == State.FAILED - @skip_spawn_mp_start - @pytest.mark.parametrize( - ("parallelism",), - [pytest.param(2, id="limited")], - ) - def test_execution(self, parallelism: int): - self._test_execute(parallelism=parallelism) - @mock.patch("airflow.executors.local_executor.LocalExecutor.sync") @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks") @mock.patch("airflow.executors.base_executor.Stats.gauge") diff --git a/airflow-core/tests/unit/executors/test_local_executor_check_workers.py b/airflow-core/tests/unit/executors/test_local_executor_check_workers.py index 094ce230596..b0adfe5b9e5 100644 --- a/airflow-core/tests/unit/executors/test_local_executor_check_workers.py +++ b/airflow-core/tests/unit/executors/test_local_executor_check_workers.py @@ -117,16 +117,6 @@ def test_no_spawn_if_parallelism_reached(setup_executor): executor._spawn_worker.assert_not_called() -def test_parallelism_zero_spawns_worker(setup_executor): - executor = setup_executor - executor.parallelism = 0 - executor._unread_messages.value = 1 - executor.activity_queue.empty.return_value = False - executor.workers = {} - executor._check_workers() - executor._spawn_worker.assert_called_once() - - def test_spawn_worker_when_we_have_parallelism_left(setup_executor): executor = setup_executor # Simulate 4 running workers
