This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 29f838d5f16 feat(scheduler): Add --only-idle flag to airflow scheduler
(#62055)
29f838d5f16 is described below
commit 29f838d5f16ff4a6f457bfaa02ecc626d13cf0dc
Author: Juarez Rudsatz <[email protected]>
AuthorDate: Tue Mar 3 19:08:20 2026 -0300
feat(scheduler): Add --only-idle flag to airflow scheduler (#62055)
* Add flag --only-idle to cli command airflow scheduler
* Add test case for flag scheduler --num-runs --only-idle
* Add test case for checking dag runs with the flag --only-idle
* Add newsfragment for the scheduler --only-idle flag
* Fix failures in the CI int the tests flag --only-idle
* Polish source code in scheduler --only-idle flag
---
airflow-core/newsfragments/62055.significant.rst | 16 +++++++
airflow-core/src/airflow/cli/cli_config.py | 8 ++++
.../src/airflow/cli/commands/scheduler_command.py | 9 +++-
.../src/airflow/config_templates/config.yml | 8 ++++
.../src/airflow/jobs/scheduler_job_runner.py | 21 +++++++--
.../unit/cli/commands/test_scheduler_command.py | 19 ++++++++
airflow-core/tests/unit/jobs/test_scheduler_job.py | 50 ++++++++++++++++++++++
7 files changed, 127 insertions(+), 4 deletions(-)
diff --git a/airflow-core/newsfragments/62055.significant.rst
b/airflow-core/newsfragments/62055.significant.rst
new file mode 100644
index 00000000000..7e4ddaed981
--- /dev/null
+++ b/airflow-core/newsfragments/62055.significant.rst
@@ -0,0 +1,16 @@
+Airflow scheduler CLI command have a new ``--only-idle`` flag to only count
runs when the scheduler is idle.
+
+It will help users to run the scheduler once and process all the triggered
DAGs and all the queued tasks.
+It requires and complements the ``--num-runs`` flag so one can set a small
value to it instead of gessing how many times the scheduler will run.
+
+
+* Types of change
+
+ * [ ] Dag changes
+ * [ ] Config changes
+ * [ ] API changes
+ * [X] CLI changes
+ * [ ] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [ ] Code interface changes
diff --git a/airflow-core/src/airflow/cli/cli_config.py
b/airflow-core/src/airflow/cli/cli_config.py
index 16fe3e29afa..7bc5b94f9bc 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -729,6 +729,13 @@ ARG_NUM_RUNS = Arg(
help="Set the number of runs to execute before exiting",
)
+ARG_ONLY_IDLE = Arg(
+ ("-i", "--only-idle"),
+ default=conf.getboolean("scheduler", "only_idle", fallback=False),
+ help="Only count runs after the scheduler becomes idle.",
+ action="store_true",
+)
+
ARG_WITHOUT_MINGLE = Arg(
("--without-mingle",),
default=False,
@@ -1989,6 +1996,7 @@ core_commands: list[CLICommand] = [
func=lazy_load_command("airflow.cli.commands.scheduler_command.scheduler"),
args=(
ARG_NUM_RUNS,
+ ARG_ONLY_IDLE,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
diff --git a/airflow-core/src/airflow/cli/commands/scheduler_command.py
b/airflow-core/src/airflow/cli/commands/scheduler_command.py
index 089d7dd5196..f797d656257 100644
--- a/airflow-core/src/airflow/cli/commands/scheduler_command.py
+++ b/airflow-core/src/airflow/cli/commands/scheduler_command.py
@@ -39,7 +39,11 @@ log = logging.getLogger(__name__)
@enable_memray_trace(component=MemrayTraceComponents.scheduler)
def _run_scheduler_job(args) -> None:
- job_runner = SchedulerJobRunner(job=Job(), num_runs=args.num_runs)
+ job_runner = SchedulerJobRunner(
+ job=Job(),
+ num_runs=args.num_runs,
+ only_idle=args.only_idle,
+ )
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
with _serve_logs(args.skip_serve_logs),
_serve_health_check(enable_health_check):
run_job(job=job_runner.job, execute_callable=job_runner._execute)
@@ -51,6 +55,9 @@ def scheduler(args: Namespace):
"""Start Airflow Scheduler."""
print(settings.HEADER)
+ if args.only_idle and args.num_runs <= 0:
+ raise SystemExit("The --only-idle flag requires --num-runs to be set
to a positive number.")
+
if cli_utils.should_enable_hot_reload(args):
from airflow.cli.hot_reload import run_with_reloader
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 29ad76c5d93..7c66eb3938f 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2310,6 +2310,14 @@ scheduler:
type: integer
example: ~
default: "-1"
+ only_idle:
+ description: |
+ Only count scheduler runs where the scheduler was idle (no tasks
queued or finished)
+ toward the run limit set by ``[scheduler] num_runs``. The count resets
whenever a task is processed.
+ version_added: 3.2.0
+ type: boolean
+ example: ~
+ default: "false"
scheduler_idle_sleep_time:
description: |
Controls how long the scheduler will sleep between loops, but if there
was nothing to do
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 2da41877c97..b77f87f6fb1 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -229,6 +229,9 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
:param num_runs: The number of times to run the scheduling loop. If you
have a large number of DAG files this could complete before each file
has been parsed. -1 for unlimited times.
+ :param only_idle: When True, only count runs where the scheduler was
+ idle (no tasks queued or finished). The count resets to zero whenever
+ a task is processed. Requires num_runs > 0.
:param scheduler_idle_sleep_time: The number of seconds to wait between
polls of running processors
:param log: override the default Logger
@@ -248,12 +251,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self,
job: Job,
num_runs: int = conf.getint("scheduler", "num_runs"),
+ only_idle: bool = conf.getboolean("scheduler", "only_idle",
fallback=False),
scheduler_idle_sleep_time: float = conf.getfloat("scheduler",
"scheduler_idle_sleep_time"),
log: Logger | None = None,
executors: list[BaseExecutor] | None = None,
):
super().__init__(job)
self.num_runs = num_runs
+ self.only_idle = only_idle
self._scheduler_idle_sleep_time = scheduler_idle_sleep_time
# How many seconds do we wait for tasks to heartbeat before timeout.
self._task_instance_heartbeat_timeout_secs = conf.getint(
@@ -1754,6 +1759,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
action=bundle_cleanup_mgr.remove_stale_bundle_versions,
)
+ idle_count = 0
+
for loop_count in itertools.count(start=1):
with Stats.timer("scheduler.scheduler_loop_duration") as timer:
with create_session() as session:
@@ -1810,16 +1817,24 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.log.debug("Ran scheduling loop in %.2f ms", timer.duration)
- if not is_unit_test and not num_queued_tis and not
num_finished_events:
+ idle_in_this_run = not num_queued_tis and not num_finished_events
+ if not is_unit_test and idle_in_this_run:
# If the scheduler is doing things, don't sleep. This means
when there is work to do, the
# scheduler will run "as quick as possible", but when it's
stopped, it can sleep, dropping CPU
# usage when "idle"
time.sleep(min(self._scheduler_idle_sleep_time, next_event or
0))
- if loop_count >= self.num_runs > 0:
+ if idle_in_this_run:
+ idle_count += 1
+ else:
+ idle_count = 0
+
+ run_count = idle_count if self.only_idle else loop_count
+ if run_count >= self.num_runs > 0:
self.log.info(
- "Exiting scheduler loop as requested number of runs (%d -
got to %d) has been reached",
+ "Exiting scheduler loop as requested number of runs (%d)
has been reached (%d idle, %d total)",
self.num_runs,
+ idle_count,
loop_count,
)
break
diff --git a/airflow-core/tests/unit/cli/commands/test_scheduler_command.py
b/airflow-core/tests/unit/cli/commands/test_scheduler_command.py
index e381f9a754f..2c9e995efab 100644
--- a/airflow-core/tests/unit/cli/commands/test_scheduler_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_scheduler_command.py
@@ -173,3 +173,22 @@ class TestSchedulerCommand:
mock_reloader.assert_called_once()
# The callback function should be callable
assert callable(mock_reloader.call_args[0][0])
+
+ def test_only_idle_requires_positive_num_runs(self):
+ """--only-idle with -n 0 or negative must raise SystemExit."""
+ for n in ("0", "-1"):
+ args = self.parser.parse_args(["scheduler", "--only-idle", "-n",
n])
+ with pytest.raises(SystemExit):
+ scheduler_command.scheduler(args)
+
+ @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
+ @mock.patch("airflow.cli.commands.scheduler_command.Process")
+ def test_only_idle_passes_to_job_runner(self, mock_process,
mock_scheduler_job):
+ """SchedulerJobRunner must be called with only_idle=True when
--only-idle is used."""
+ mock_scheduler_job.return_value.job_type = "SchedulerJob"
+ args = self.parser.parse_args(["scheduler", "--only-idle", "-n", "5"])
+ scheduler_command.scheduler(args)
+ mock_scheduler_job.assert_called_once()
+ call_kwargs = mock_scheduler_job.call_args[1]
+ assert call_kwargs["only_idle"] is True
+ assert call_kwargs["num_runs"] == 5
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index ec4ba77e7d8..7de30013ec5 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -21,6 +21,7 @@ import contextlib
import datetime
import logging
import os
+import re
from collections import Counter, deque
from collections.abc import Callable, Generator, Iterator
from contextlib import ExitStack
@@ -378,6 +379,55 @@ class TestSchedulerJob:
current_children = set(current_process.children(recursive=True)) -
set(old_children)
assert not current_children
+ def test_only_idle_no_dags_exits_after_n_idle_runs(self, caplog,
configure_testing_dag_bundle):
+ num_runs = 5
+ with caplog.at_level(logging.INFO,
logger="airflow.jobs.scheduler_job_runner"):
+ with configure_testing_dag_bundle(os.devnull):
+ executor = MockExecutor(do_update=False)
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(
+ job=scheduler_job,
+ num_runs=num_runs,
+ only_idle=True,
+ executors=[executor],
+ )
+ run_job(scheduler_job,
execute_callable=self.job_runner._execute)
+
+ match = re.search(r"\((\d+) idle, (\d+) total\)", caplog.text)
+ assert match, f"Expected exit log '(N idle, M total)' in:
{caplog.text}"
+ idle_runs_val, total_runs_val = int(match.group(1)),
int(match.group(2))
+ assert total_runs_val == num_runs, "With no DAGs, total loop count
should equal num_runs"
+ assert idle_runs_val == num_runs, "With no DAGs, all runs are idle;
idle count should equal num_runs"
+
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_only_idle_with_dag_exits_after_n_idle_runs(self, caplog,
dag_maker, session):
+ num_runs = 5
+ with dag_maker(dag_id="test_only_idle_one_task",
fileloc="test_only_idle_one_task.py"):
+ EmptyOperator(task_id="dummy")
+ dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
state=State.RUNNING)
+ ti = dr.get_task_instance("dummy", session)
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.commit()
+
+ executor = MockExecutor(do_update=False)
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(
+ job=scheduler_job,
+ num_runs=num_runs,
+ only_idle=True,
+ executors=[executor],
+ )
+ with caplog.at_level(logging.INFO,
logger="airflow.jobs.scheduler_job_runner"):
+ run_job(scheduler_job, execute_callable=self.job_runner._execute)
+
+ match = re.search(r"\((\d+) idle, (\d+) total\)", caplog.text)
+ assert match, f"Expected exit log '(N idle, M total)' in:
{caplog.text}"
+ idle_runs_val, total_runs_val = int(match.group(1)),
int(match.group(2))
+ assert total_runs_val >= num_runs, "Total loop count should be at
least num_runs"
+ assert idle_runs_val == num_runs, "Scheduler exits when idle run count
reaches num_runs"
+ assert total_runs_val > idle_runs_val, "Some runs should not be idle"
+
@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
def test_process_executor_events(self, mock_stats_incr,
mock_task_callback, dag_maker):