This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 29f838d5f16 feat(scheduler): Add --only-idle flag to airflow scheduler 
(#62055)
29f838d5f16 is described below

commit 29f838d5f16ff4a6f457bfaa02ecc626d13cf0dc
Author: Juarez Rudsatz <[email protected]>
AuthorDate: Tue Mar 3 19:08:20 2026 -0300

    feat(scheduler): Add --only-idle flag to airflow scheduler (#62055)
    
    * Add flag --only-idle to cli command airflow scheduler
    
    * Add test case for flag scheduler --num-runs --only-idle
    
    * Add test case for checking dag runs with the flag --only-idle
    
    * Add newsfragment for the scheduler --only-idle flag
    
    * Fix failures in the CI int the tests flag --only-idle
    
    * Polish source code in scheduler --only-idle flag
---
 airflow-core/newsfragments/62055.significant.rst   | 16 +++++++
 airflow-core/src/airflow/cli/cli_config.py         |  8 ++++
 .../src/airflow/cli/commands/scheduler_command.py  |  9 +++-
 .../src/airflow/config_templates/config.yml        |  8 ++++
 .../src/airflow/jobs/scheduler_job_runner.py       | 21 +++++++--
 .../unit/cli/commands/test_scheduler_command.py    | 19 ++++++++
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 50 ++++++++++++++++++++++
 7 files changed, 127 insertions(+), 4 deletions(-)

diff --git a/airflow-core/newsfragments/62055.significant.rst 
b/airflow-core/newsfragments/62055.significant.rst
new file mode 100644
index 00000000000..7e4ddaed981
--- /dev/null
+++ b/airflow-core/newsfragments/62055.significant.rst
@@ -0,0 +1,16 @@
+Airflow scheduler CLI command have a new ``--only-idle`` flag to only count 
runs when the scheduler is idle.
+
+It will help users to run the scheduler once and process all the triggered 
DAGs and all the queued tasks.
+It requires and complements the ``--num-runs`` flag so one can set a small 
value to it instead of gessing how many times the scheduler will run.
+
+
+* Types of change
+
+  * [ ] Dag changes
+  * [ ] Config changes
+  * [ ] API changes
+  * [X] CLI changes
+  * [ ] Behaviour changes
+  * [ ] Plugin changes
+  * [ ] Dependency changes
+  * [ ] Code interface changes
diff --git a/airflow-core/src/airflow/cli/cli_config.py 
b/airflow-core/src/airflow/cli/cli_config.py
index 16fe3e29afa..7bc5b94f9bc 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -729,6 +729,13 @@ ARG_NUM_RUNS = Arg(
     help="Set the number of runs to execute before exiting",
 )
 
+ARG_ONLY_IDLE = Arg(
+    ("-i", "--only-idle"),
+    default=conf.getboolean("scheduler", "only_idle", fallback=False),
+    help="Only count runs after the scheduler becomes idle.",
+    action="store_true",
+)
+
 ARG_WITHOUT_MINGLE = Arg(
     ("--without-mingle",),
     default=False,
@@ -1989,6 +1996,7 @@ core_commands: list[CLICommand] = [
         
func=lazy_load_command("airflow.cli.commands.scheduler_command.scheduler"),
         args=(
             ARG_NUM_RUNS,
+            ARG_ONLY_IDLE,
             ARG_PID,
             ARG_DAEMON,
             ARG_STDOUT,
diff --git a/airflow-core/src/airflow/cli/commands/scheduler_command.py 
b/airflow-core/src/airflow/cli/commands/scheduler_command.py
index 089d7dd5196..f797d656257 100644
--- a/airflow-core/src/airflow/cli/commands/scheduler_command.py
+++ b/airflow-core/src/airflow/cli/commands/scheduler_command.py
@@ -39,7 +39,11 @@ log = logging.getLogger(__name__)
 
 @enable_memray_trace(component=MemrayTraceComponents.scheduler)
 def _run_scheduler_job(args) -> None:
-    job_runner = SchedulerJobRunner(job=Job(), num_runs=args.num_runs)
+    job_runner = SchedulerJobRunner(
+        job=Job(),
+        num_runs=args.num_runs,
+        only_idle=args.only_idle,
+    )
     enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
     with _serve_logs(args.skip_serve_logs), 
_serve_health_check(enable_health_check):
         run_job(job=job_runner.job, execute_callable=job_runner._execute)
@@ -51,6 +55,9 @@ def scheduler(args: Namespace):
     """Start Airflow Scheduler."""
     print(settings.HEADER)
 
+    if args.only_idle and args.num_runs <= 0:
+        raise SystemExit("The --only-idle flag requires --num-runs to be set 
to a positive number.")
+
     if cli_utils.should_enable_hot_reload(args):
         from airflow.cli.hot_reload import run_with_reloader
 
diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index 29ad76c5d93..7c66eb3938f 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2310,6 +2310,14 @@ scheduler:
       type: integer
       example: ~
       default: "-1"
+    only_idle:
+      description: |
+        Only count scheduler runs where the scheduler was idle (no tasks 
queued or finished)
+        toward the run limit set by ``[scheduler] num_runs``. The count resets 
whenever a task is processed.
+      version_added: 3.2.0
+      type: boolean
+      example: ~
+      default: "false"
     scheduler_idle_sleep_time:
       description: |
         Controls how long the scheduler will sleep between loops, but if there 
was nothing to do
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 2da41877c97..b77f87f6fb1 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -229,6 +229,9 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
     :param num_runs: The number of times to run the scheduling loop. If you
         have a large number of DAG files this could complete before each file
         has been parsed. -1 for unlimited times.
+    :param only_idle: When True, only count runs where the scheduler was
+        idle (no tasks queued or finished). The count resets to zero whenever
+        a task is processed. Requires num_runs > 0.
     :param scheduler_idle_sleep_time: The number of seconds to wait between
         polls of running processors
     :param log: override the default Logger
@@ -248,12 +251,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         self,
         job: Job,
         num_runs: int = conf.getint("scheduler", "num_runs"),
+        only_idle: bool = conf.getboolean("scheduler", "only_idle", 
fallback=False),
         scheduler_idle_sleep_time: float = conf.getfloat("scheduler", 
"scheduler_idle_sleep_time"),
         log: Logger | None = None,
         executors: list[BaseExecutor] | None = None,
     ):
         super().__init__(job)
         self.num_runs = num_runs
+        self.only_idle = only_idle
         self._scheduler_idle_sleep_time = scheduler_idle_sleep_time
         # How many seconds do we wait for tasks to heartbeat before timeout.
         self._task_instance_heartbeat_timeout_secs = conf.getint(
@@ -1754,6 +1759,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     action=bundle_cleanup_mgr.remove_stale_bundle_versions,
                 )
 
+        idle_count = 0
+
         for loop_count in itertools.count(start=1):
             with Stats.timer("scheduler.scheduler_loop_duration") as timer:
                 with create_session() as session:
@@ -1810,16 +1817,24 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
             self.log.debug("Ran scheduling loop in %.2f ms", timer.duration)
 
-            if not is_unit_test and not num_queued_tis and not 
num_finished_events:
+            idle_in_this_run = not num_queued_tis and not num_finished_events
+            if not is_unit_test and idle_in_this_run:
                 # If the scheduler is doing things, don't sleep. This means 
when there is work to do, the
                 # scheduler will run "as quick as possible", but when it's 
stopped, it can sleep, dropping CPU
                 # usage when "idle"
                 time.sleep(min(self._scheduler_idle_sleep_time, next_event or 
0))
 
-            if loop_count >= self.num_runs > 0:
+            if idle_in_this_run:
+                idle_count += 1
+            else:
+                idle_count = 0
+
+            run_count = idle_count if self.only_idle else loop_count
+            if run_count >= self.num_runs > 0:
                 self.log.info(
-                    "Exiting scheduler loop as requested number of runs (%d - 
got to %d) has been reached",
+                    "Exiting scheduler loop as requested number of runs (%d) 
has been reached (%d idle, %d total)",
                     self.num_runs,
+                    idle_count,
                     loop_count,
                 )
                 break
diff --git a/airflow-core/tests/unit/cli/commands/test_scheduler_command.py 
b/airflow-core/tests/unit/cli/commands/test_scheduler_command.py
index e381f9a754f..2c9e995efab 100644
--- a/airflow-core/tests/unit/cli/commands/test_scheduler_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_scheduler_command.py
@@ -173,3 +173,22 @@ class TestSchedulerCommand:
         mock_reloader.assert_called_once()
         # The callback function should be callable
         assert callable(mock_reloader.call_args[0][0])
+
+    def test_only_idle_requires_positive_num_runs(self):
+        """--only-idle with -n 0 or negative must raise SystemExit."""
+        for n in ("0", "-1"):
+            args = self.parser.parse_args(["scheduler", "--only-idle", "-n", 
n])
+            with pytest.raises(SystemExit):
+                scheduler_command.scheduler(args)
+
+    @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
+    @mock.patch("airflow.cli.commands.scheduler_command.Process")
+    def test_only_idle_passes_to_job_runner(self, mock_process, 
mock_scheduler_job):
+        """SchedulerJobRunner must be called with only_idle=True when 
--only-idle is used."""
+        mock_scheduler_job.return_value.job_type = "SchedulerJob"
+        args = self.parser.parse_args(["scheduler", "--only-idle", "-n", "5"])
+        scheduler_command.scheduler(args)
+        mock_scheduler_job.assert_called_once()
+        call_kwargs = mock_scheduler_job.call_args[1]
+        assert call_kwargs["only_idle"] is True
+        assert call_kwargs["num_runs"] == 5
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index ec4ba77e7d8..7de30013ec5 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -21,6 +21,7 @@ import contextlib
 import datetime
 import logging
 import os
+import re
 from collections import Counter, deque
 from collections.abc import Callable, Generator, Iterator
 from contextlib import ExitStack
@@ -378,6 +379,55 @@ class TestSchedulerJob:
         current_children = set(current_process.children(recursive=True)) - 
set(old_children)
         assert not current_children
 
+    def test_only_idle_no_dags_exits_after_n_idle_runs(self, caplog, 
configure_testing_dag_bundle):
+        num_runs = 5
+        with caplog.at_level(logging.INFO, 
logger="airflow.jobs.scheduler_job_runner"):
+            with configure_testing_dag_bundle(os.devnull):
+                executor = MockExecutor(do_update=False)
+                scheduler_job = Job()
+                self.job_runner = SchedulerJobRunner(
+                    job=scheduler_job,
+                    num_runs=num_runs,
+                    only_idle=True,
+                    executors=[executor],
+                )
+                run_job(scheduler_job, 
execute_callable=self.job_runner._execute)
+
+        match = re.search(r"\((\d+) idle, (\d+) total\)", caplog.text)
+        assert match, f"Expected exit log '(N idle, M total)' in: 
{caplog.text}"
+        idle_runs_val, total_runs_val = int(match.group(1)), 
int(match.group(2))
+        assert total_runs_val == num_runs, "With no DAGs, total loop count 
should equal num_runs"
+        assert idle_runs_val == num_runs, "With no DAGs, all runs are idle; 
idle count should equal num_runs"
+
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def test_only_idle_with_dag_exits_after_n_idle_runs(self, caplog, 
dag_maker, session):
+        num_runs = 5
+        with dag_maker(dag_id="test_only_idle_one_task", 
fileloc="test_only_idle_one_task.py"):
+            EmptyOperator(task_id="dummy")
+        dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=State.RUNNING)
+        ti = dr.get_task_instance("dummy", session)
+        ti.state = State.SCHEDULED
+        session.merge(ti)
+        session.commit()
+
+        executor = MockExecutor(do_update=False)
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(
+            job=scheduler_job,
+            num_runs=num_runs,
+            only_idle=True,
+            executors=[executor],
+        )
+        with caplog.at_level(logging.INFO, 
logger="airflow.jobs.scheduler_job_runner"):
+            run_job(scheduler_job, execute_callable=self.job_runner._execute)
+
+        match = re.search(r"\((\d+) idle, (\d+) total\)", caplog.text)
+        assert match, f"Expected exit log '(N idle, M total)' in: 
{caplog.text}"
+        idle_runs_val, total_runs_val = int(match.group(1)), 
int(match.group(2))
+        assert total_runs_val >= num_runs, "Total loop count should be at 
least num_runs"
+        assert idle_runs_val == num_runs, "Scheduler exits when idle run count 
reaches num_runs"
+        assert total_runs_val > idle_runs_val, "Some runs should not be idle"
+
     @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
     @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
     def test_process_executor_events(self, mock_stats_incr, 
mock_task_callback, dag_maker):

Reply via email to