This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 92c8697f20 Re-add the ability to set parallelism to unlimited (#41107)
92c8697f20 is described below
commit 92c8697f20d97b5bd1eae2142412d18bc972906d
Author: Niko Oliveira <[email protected]>
AuthorDate: Wed Jul 31 11:16:51 2024 -0700
Re-add the ability to set parallelism to unlimited (#41107)
This feature was/is a bit half baked and undocumented (although
better now), and got dropped during development of multiple executor
configuration. This PR re-adds that feature, and also adds some tests
and documentation that was not present before.
---
airflow/config_templates/config.yml | 2 +-
airflow/executors/base_executor.py | 3 +-
airflow/jobs/scheduler_job_runner.py | 4 ++
tests/jobs/test_scheduler_job.py | 120 +++++++++++++++++++++++++++++++++++
4 files changed, 127 insertions(+), 2 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 7087c42011..53ab612266 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -89,7 +89,7 @@ core:
This defines the maximum number of task instances that can run
concurrently per scheduler in
Airflow, regardless of the worker count. Generally this value,
multiplied by the number of
schedulers in your cluster, is the maximum number of task instances
with the running
- state in the metadata database.
+ state in the metadata database. Setting this value to zero allows
unlimited parallelism.
version_added: ~
type: string
example: ~
diff --git a/airflow/executors/base_executor.py
b/airflow/executors/base_executor.py
index 173b6da8b5..dd0b8a66d2 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -288,7 +288,8 @@ class BaseExecutor(LoggingMixin):
self.log.debug("%s running task instances for executor %s",
num_running_tasks, name)
self.log.debug("%s in queue for executor %s", num_queued_tasks, name)
if open_slots == 0:
- self.log.info("Executor parallelism limit reached. 0 open slots.")
+ if self.parallelism:
+ self.log.info("Executor parallelism limit reached. 0 open
slots.")
else:
self.log.debug("%s open slots for executor %s", open_slots, name)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 4e581e9709..9e1f0121ac 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -717,11 +717,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# across all executors.
num_occupied_slots = sum([executor.slots_occupied for executor in
self.job.executors])
parallelism = conf.getint("core", "parallelism")
+ # Parallelism configured to 0 means infinite currently running tasks
+ if parallelism == 0:
+ parallelism = sys.maxsize
if self.job.max_tis_per_query == 0:
max_tis = parallelism - num_occupied_slots
else:
max_tis = min(self.job.max_tis_per_query, parallelism -
num_occupied_slots)
if max_tis <= 0:
+ self.log.debug("max_tis query size is less than or equal to zero.
No query will be performed!")
return 0
queued_tis = self._executable_task_instances_to_queued(max_tis,
session=session)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index fe6b8d922f..eb6064be04 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -21,6 +21,7 @@ import contextlib
import datetime
import logging
import os
+import sys
from collections import deque
from datetime import timedelta
from importlib import reload
@@ -1961,6 +1962,125 @@ class TestSchedulerJob:
assert total_enqueued == 31
session.rollback()
+ @pytest.mark.parametrize(
+ "task1_exec, task2_exec",
+ [
+ ("default_exec", "default_exec"),
+ ("default_exec", "secondary_exec"),
+ ("secondary_exec", "secondary_exec"),
+ ],
+ )
+ def test_execute_task_instances_unlimited_parallelism_multiple_executors(
+ self, task1_exec, task2_exec, dag_maker, mock_executors
+ ):
+ """Test core.parallelism leads to unlimited scheduling, but queries
limited by max_tis"""
+
+ dag_id = "SchedulerJobTest.test_execute_task_instances_unlimited"
+ task_id_1 = "dummy_task"
+ task_id_2 = "dummy_task_2"
+ session = settings.Session()
+
+ with dag_maker(dag_id=dag_id, max_active_tasks=1024, session=session):
+ task1 = EmptyOperator(task_id=task_id_1, executor=task1_exec)
+ task2 = EmptyOperator(task_id=task_id_2, executor=task2_exec)
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull)
+
+ def _create_dagruns():
+ dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
state=State.RUNNING)
+ yield dagrun
+ for _ in range(39):
+ dagrun = dag_maker.create_dagrun_after(
+ dagrun,
+ run_type=DagRunType.SCHEDULED,
+ state=State.RUNNING,
+ )
+ yield dagrun
+
+ for dr in _create_dagruns():
+ ti1 = dr.get_task_instance(task1.task_id, session)
+ ti2 = dr.get_task_instance(task2.task_id, session)
+ ti1.state = State.SCHEDULED
+ ti2.state = State.SCHEDULED
+ session.flush()
+
+ scheduler_job.max_tis_per_query = 50
+ for executor in mock_executors:
+ executor.parallelism = 0
+ executor.slots_occupied = 0
+ executor.slots_available = sys.maxsize
+
+ with conf_vars({("core", "parallelism"): "0"}):
+ # 40 dag runs * 2 tasks each = 80.
+ enqueued =
self.job_runner._critical_section_enqueue_task_instances(session)
+ # Parallelism is unlimited, but we still only query for
max_tis_per_query each time we enqueue
+ assert enqueued == 50
+
+ enqueued =
self.job_runner._critical_section_enqueue_task_instances(session)
+ # The remaining 30 are enqueued the next loop
+ assert enqueued == 30
+
+ session.rollback()
+
+ @pytest.mark.parametrize(
+ "task1_exec, task2_exec",
+ [
+ ("default_exec", "default_exec"),
+ ("default_exec", "secondary_exec"),
+ ("secondary_exec", "secondary_exec"),
+ ],
+ )
+ def
test_execute_task_instances_unlimited_parallelism_unlimited_max_tis_multiple_executors(
+ self, task1_exec, task2_exec, dag_maker, mock_executors
+ ):
+ """Test core.parallelism leads to unlimited scheduling"""
+
+ dag_id = "SchedulerJobTest.test_execute_task_instances_unlimited"
+ task_id_1 = "dummy_task"
+ task_id_2 = "dummy_task_2"
+ session = settings.Session()
+
+ with dag_maker(dag_id=dag_id, max_active_tasks=1024, session=session):
+ task1 = EmptyOperator(task_id=task_id_1, executor=task1_exec)
+ task2 = EmptyOperator(task_id=task_id_2, executor=task2_exec)
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull)
+
+ def _create_dagruns():
+ dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
state=State.RUNNING)
+ yield dagrun
+ for _ in range(39):
+ dagrun = dag_maker.create_dagrun_after(
+ dagrun,
+ run_type=DagRunType.SCHEDULED,
+ state=State.RUNNING,
+ )
+ yield dagrun
+
+ for dr in _create_dagruns():
+ ti1 = dr.get_task_instance(task1.task_id, session)
+ ti2 = dr.get_task_instance(task2.task_id, session)
+ ti1.state = State.SCHEDULED
+ ti2.state = State.SCHEDULED
+ session.flush()
+
+ scheduler_job.max_tis_per_query = 0
+ for executor in mock_executors:
+ executor.parallelism = 0
+ executor.slots_occupied = 0
+ executor.slots_available = sys.maxsize
+
+ with conf_vars({("core", "parallelism"): "0"}):
+ # 40 dag runs * 2 tasks each = 80. With core.parallelism set to
zero then executors have
+ # unlimited slots and with max_tis_per_query set to zero, query
will match also allow infinity.
+ # Thus, all tasks should be enqueued in one step
+ enqueued =
self.job_runner._critical_section_enqueue_task_instances(session)
+
+ assert enqueued == 80
+ session.rollback()
+
def test_adopt_or_reset_orphaned_tasks(self, dag_maker):
session = settings.Session()
with dag_maker("test_execute_helper_reset_orphaned_tasks") as dag: