This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 92c8697f20 Re-add the ability to set parallelism to unlimited (#41107)
92c8697f20 is described below

commit 92c8697f20d97b5bd1eae2142412d18bc972906d
Author: Niko Oliveira <[email protected]>
AuthorDate: Wed Jul 31 11:16:51 2024 -0700

    Re-add the ability to set parallelism to unlimited (#41107)
    
    This feature was/is a bit half baked and undocumented (although
    better now), and got dropped during development of multiple executor
    configuration. This PR re-adds that feature, and also adds some tests
    and documentation that was not present before.
---
 airflow/config_templates/config.yml  |   2 +-
 airflow/executors/base_executor.py   |   3 +-
 airflow/jobs/scheduler_job_runner.py |   4 ++
 tests/jobs/test_scheduler_job.py     | 120 +++++++++++++++++++++++++++++++++++
 4 files changed, 127 insertions(+), 2 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 7087c42011..53ab612266 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -89,7 +89,7 @@ core:
         This defines the maximum number of task instances that can run 
concurrently per scheduler in
         Airflow, regardless of the worker count. Generally this value, 
multiplied by the number of
         schedulers in your cluster, is the maximum number of task instances 
with the running
-        state in the metadata database.
+        state in the metadata database. Setting this value to zero allows 
unlimited parallelism.
       version_added: ~
       type: string
       example: ~
diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index 173b6da8b5..dd0b8a66d2 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -288,7 +288,8 @@ class BaseExecutor(LoggingMixin):
         self.log.debug("%s running task instances for executor %s", 
num_running_tasks, name)
         self.log.debug("%s in queue for executor %s", num_queued_tasks, name)
         if open_slots == 0:
-            self.log.info("Executor parallelism limit reached. 0 open slots.")
+            if self.parallelism:
+                self.log.info("Executor parallelism limit reached. 0 open 
slots.")
         else:
             self.log.debug("%s open slots for executor %s", open_slots, name)
 
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 4e581e9709..9e1f0121ac 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -717,11 +717,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         # across all executors.
         num_occupied_slots = sum([executor.slots_occupied for executor in 
self.job.executors])
         parallelism = conf.getint("core", "parallelism")
+        # Parallelism configured to 0 means infinite currently running tasks
+        if parallelism == 0:
+            parallelism = sys.maxsize
         if self.job.max_tis_per_query == 0:
             max_tis = parallelism - num_occupied_slots
         else:
             max_tis = min(self.job.max_tis_per_query, parallelism - 
num_occupied_slots)
         if max_tis <= 0:
+            self.log.debug("max_tis query size is less than or equal to zero. 
No query will be performed!")
             return 0
 
         queued_tis = self._executable_task_instances_to_queued(max_tis, 
session=session)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index fe6b8d922f..eb6064be04 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -21,6 +21,7 @@ import contextlib
 import datetime
 import logging
 import os
+import sys
 from collections import deque
 from datetime import timedelta
 from importlib import reload
@@ -1961,6 +1962,125 @@ class TestSchedulerJob:
             assert total_enqueued == 31
         session.rollback()
 
+    @pytest.mark.parametrize(
+        "task1_exec, task2_exec",
+        [
+            ("default_exec", "default_exec"),
+            ("default_exec", "secondary_exec"),
+            ("secondary_exec", "secondary_exec"),
+        ],
+    )
+    def test_execute_task_instances_unlimited_parallelism_multiple_executors(
+        self, task1_exec, task2_exec, dag_maker, mock_executors
+    ):
+        """Test core.parallelism leads to unlimited scheduling, but queries 
limited by max_tis"""
+
+        dag_id = "SchedulerJobTest.test_execute_task_instances_unlimited"
+        task_id_1 = "dummy_task"
+        task_id_2 = "dummy_task_2"
+        session = settings.Session()
+
+        with dag_maker(dag_id=dag_id, max_active_tasks=1024, session=session):
+            task1 = EmptyOperator(task_id=task_id_1, executor=task1_exec)
+            task2 = EmptyOperator(task_id=task_id_2, executor=task2_exec)
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull)
+
+        def _create_dagruns():
+            dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=State.RUNNING)
+            yield dagrun
+            for _ in range(39):
+                dagrun = dag_maker.create_dagrun_after(
+                    dagrun,
+                    run_type=DagRunType.SCHEDULED,
+                    state=State.RUNNING,
+                )
+                yield dagrun
+
+        for dr in _create_dagruns():
+            ti1 = dr.get_task_instance(task1.task_id, session)
+            ti2 = dr.get_task_instance(task2.task_id, session)
+            ti1.state = State.SCHEDULED
+            ti2.state = State.SCHEDULED
+            session.flush()
+
+        scheduler_job.max_tis_per_query = 50
+        for executor in mock_executors:
+            executor.parallelism = 0
+            executor.slots_occupied = 0
+            executor.slots_available = sys.maxsize
+
+        with conf_vars({("core", "parallelism"): "0"}):
+            # 40 dag runs * 2 tasks each = 80.
+            enqueued = 
self.job_runner._critical_section_enqueue_task_instances(session)
+            # Parallelism is unlimited, but we still only query for 
max_tis_per_query each time we enqueue
+            assert enqueued == 50
+
+            enqueued = 
self.job_runner._critical_section_enqueue_task_instances(session)
+            # The remaining 30 are enqueued the next loop
+            assert enqueued == 30
+
+        session.rollback()
+
+    @pytest.mark.parametrize(
+        "task1_exec, task2_exec",
+        [
+            ("default_exec", "default_exec"),
+            ("default_exec", "secondary_exec"),
+            ("secondary_exec", "secondary_exec"),
+        ],
+    )
+    def 
test_execute_task_instances_unlimited_parallelism_unlimited_max_tis_multiple_executors(
+        self, task1_exec, task2_exec, dag_maker, mock_executors
+    ):
+        """Test core.parallelism leads to unlimited scheduling"""
+
+        dag_id = "SchedulerJobTest.test_execute_task_instances_unlimited"
+        task_id_1 = "dummy_task"
+        task_id_2 = "dummy_task_2"
+        session = settings.Session()
+
+        with dag_maker(dag_id=dag_id, max_active_tasks=1024, session=session):
+            task1 = EmptyOperator(task_id=task_id_1, executor=task1_exec)
+            task2 = EmptyOperator(task_id=task_id_2, executor=task2_exec)
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull)
+
+        def _create_dagruns():
+            dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=State.RUNNING)
+            yield dagrun
+            for _ in range(39):
+                dagrun = dag_maker.create_dagrun_after(
+                    dagrun,
+                    run_type=DagRunType.SCHEDULED,
+                    state=State.RUNNING,
+                )
+                yield dagrun
+
+        for dr in _create_dagruns():
+            ti1 = dr.get_task_instance(task1.task_id, session)
+            ti2 = dr.get_task_instance(task2.task_id, session)
+            ti1.state = State.SCHEDULED
+            ti2.state = State.SCHEDULED
+            session.flush()
+
+        scheduler_job.max_tis_per_query = 0
+        for executor in mock_executors:
+            executor.parallelism = 0
+            executor.slots_occupied = 0
+            executor.slots_available = sys.maxsize
+
+        with conf_vars({("core", "parallelism"): "0"}):
+            # 40 dag runs * 2 tasks each = 80. With core.parallelism set to 
zero then executors have
+            # unlimited slots and with max_tis_per_query set to zero, query 
will match also allow infinity.
+            # Thus, all tasks should be enqueued in one step
+            enqueued = 
self.job_runner._critical_section_enqueue_task_instances(session)
+
+            assert enqueued == 80
+        session.rollback()
+
     def test_adopt_or_reset_orphaned_tasks(self, dag_maker):
         session = settings.Session()
         with dag_maker("test_execute_helper_reset_orphaned_tasks") as dag:

Reply via email to