This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ec18db1707 Optimize scheduler by skipping "non-scheduable" DAGs 
(#30706)
ec18db1707 is described below

commit ec18db170745a8b1df0bb75569cd22e69892b3e2
Author: AutomationDev85 <[email protected]>
AuthorDate: Tue May 30 13:37:03 2023 +0200

    Optimize scheduler by skipping "non-scheduable" DAGs (#30706)
    
    Co-authored-by: Jens Scheffler <[email protected]>
---
 airflow/api/common/mark_tasks.py     |  4 ++--
 airflow/jobs/backfill_job_runner.py  |  2 +-
 airflow/jobs/scheduler_job_runner.py | 25 ++++++++++++++++++-------
 airflow/models/dag.py                |  6 +++---
 airflow/models/taskinstance.py       |  2 +-
 airflow/timetables/base.py           | 22 ++++++++++++++++++----
 airflow/timetables/simple.py         | 12 ++++++++++--
 tests/jobs/test_scheduler_job.py     | 36 ++++++++++++++++++++++++++++++++++++
 8 files changed, 89 insertions(+), 20 deletions(-)

diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index df6b87d5c2..7d2dd4a8db 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -305,7 +305,7 @@ def get_execution_dates(
     else:
         start_date = execution_date
     start_date = execution_date if not past else start_date
-    if not dag.timetable.can_run:
+    if not dag.timetable.can_be_scheduled:
         # If the DAG never schedules, need to look at existing DagRun if the 
user wants future or
         # past runs.
         dag_runs = dag.get_dagruns_between(start_date=start_date, 
end_date=end_date)
@@ -337,7 +337,7 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, past: 
bool, session: SASess
     # determine run_id range of dag runs and tasks to consider
     end_date = last_dagrun.logical_date if future else 
current_dagrun.logical_date
     start_date = current_dagrun.logical_date if not past else 
first_dagrun.logical_date
-    if not dag.timetable.can_run:
+    if not dag.timetable.can_be_scheduled:
         # If the DAG never schedules, need to look at existing DagRun if the 
user wants future or
         # past runs.
         dag_runs = dag.get_dagruns_between(start_date=start_date, 
end_date=end_date, session=session)
diff --git a/airflow/jobs/backfill_job_runner.py 
b/airflow/jobs/backfill_job_runner.py
index 28986b2bed..a44ebfa9de 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -317,7 +317,7 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
         run_date = dagrun_info.logical_date
 
         # consider max_active_runs but ignore when running subdags
-        respect_dag_max_active_limit = bool(dag.timetable.can_run and not 
dag.is_subdag)
+        respect_dag_max_active_limit = bool(dag.timetable.can_be_scheduled and 
not dag.is_subdag)
 
         current_active_dag_count = 
dag.get_num_active_runs(external_trigger=False)
 
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index da40d9eea7..e330e99ff5 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1178,7 +1178,9 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
                     creating_job_id=self.job.id,
                 )
                 active_runs_of_dags[dag.dag_id] += 1
-            if self._should_update_dag_next_dagruns(dag, dag_model, 
active_runs_of_dags[dag.dag_id]):
+            if self._should_update_dag_next_dagruns(
+                dag, dag_model, active_runs_of_dags[dag.dag_id], 
session=session
+            ):
                 dag_model.calculate_dagrun_date_fields(dag, data_interval)
         # TODO[HA]: Should we do a session.flush() so we don't have to keep 
lots of state/object in
         # memory for larger dags? or expunge_all()
@@ -1283,9 +1285,19 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
                     DatasetDagRunQueue.target_dag_id == dag_run.dag_id
                 ).delete()
 
-    def _should_update_dag_next_dagruns(self, dag: DAG, dag_model: DagModel, 
total_active_runs: int) -> bool:
+    def _should_update_dag_next_dagruns(
+        self, dag: DAG, dag_model: DagModel, total_active_runs: int | None = 
None, *, session: Session
+    ) -> bool:
         """Check if the dag's next_dagruns_create_after should be updated."""
-        if total_active_runs >= dag.max_active_runs:
+        # If the DAG never schedules skip save runtime
+        if not dag.timetable.can_be_scheduled:
+            return False
+
+        # get active dag runs from DB if not available
+        if not total_active_runs:
+            total_active_runs = dag.get_num_active_runs(only_running=False, 
session=session)
+
+        if total_active_runs and total_active_runs >= dag.max_active_runs:
             self.log.info(
                 "DAG %s is at (or above) max_active_runs (%d of %d), not 
creating any more runs",
                 dag_model.dag_id,
@@ -1398,9 +1410,8 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
                 session.merge(task_instance)
             session.flush()
             self.log.info("Run %s of %s has timed-out", dag_run.run_id, 
dag_run.dag_id)
-            active_runs = dag.get_num_active_runs(only_running=False, 
session=session)
             # Work out if we should allow creating a new DagRun now?
-            if self._should_update_dag_next_dagruns(dag, dag_model, 
active_runs):
+            if self._should_update_dag_next_dagruns(dag, dag_model, 
session=session):
                 dag_model.calculate_dagrun_date_fields(dag, 
dag.get_run_data_interval(dag_run))
 
             callback_to_execute = DagCallbackRequest(
@@ -1427,10 +1438,10 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
             return callback
         # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
         schedulable_tis, callback_to_run = 
dag_run.update_state(session=session, execute_callbacks=False)
+        # Check if DAG not scheduled then skip interval calculation to same 
scheduler runtime
         if dag_run.state in State.finished:
-            active_runs = dag.get_num_active_runs(only_running=False, 
session=session)
             # Work out if we should allow creating a new DagRun now?
-            if self._should_update_dag_next_dagruns(dag, dag_model, 
active_runs):
+            if self._should_update_dag_next_dagruns(dag, dag_model, 
session=session):
                 dag_model.calculate_dagrun_date_fields(dag, 
dag.get_run_data_interval(dag_run))
         # This will do one query per dag run. We "could" build up a complex
         # query to update all the TIs across all the execution dates and dag
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 757ef6b6ed..5fabd203b8 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -876,7 +876,7 @@ class DAG(LoggingMixin):
         DO NOT use this method is there is a known data interval.
         """
         timetable_type = type(self.timetable)
-        if issubclass(timetable_type, (NullTimetable, OnceTimetable)):
+        if issubclass(timetable_type, (NullTimetable, OnceTimetable, 
DatasetTriggeredTimetable)):
             return DataInterval.exact(timezone.coerce_datetime(logical_date))
         start = timezone.coerce_datetime(logical_date)
         if issubclass(timetable_type, CronDataIntervalTimetable):
@@ -1271,7 +1271,7 @@ class DAG(LoggingMixin):
 
     @property
     def allow_future_exec_dates(self) -> bool:
-        return settings.ALLOW_FUTURE_EXEC_DATES and not self.timetable.can_run
+        return settings.ALLOW_FUTURE_EXEC_DATES and not 
self.timetable.can_be_scheduled
 
     @provide_session
     def get_concurrency_reached(self, session=NEW_SESSION) -> bool:
@@ -3216,7 +3216,7 @@ class DAG(LoggingMixin):
         Validates & raise exception if there are any Params in the DAG which 
neither have a default value nor
         have the null in schema['type'] list, but the DAG have a 
schedule_interval which is not None.
         """
-        if not self.timetable.can_run:
+        if not self.timetable.can_be_scheduled:
             return
 
         for k, v in self.params.items():
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index bb7175a4a3..9bbee50d8f 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -993,7 +993,7 @@ class TaskInstance(Base, LoggingMixin):
         # or the DAG is never scheduled. For legacy reasons, when
         # `catchup=True`, we use `get_previous_scheduled_dagrun` unless
         # `ignore_schedule` is `True`.
-        ignore_schedule = state is not None or not dag.timetable.can_run
+        ignore_schedule = state is not None or not 
dag.timetable.can_be_scheduled
         if dag.catchup is True and not ignore_schedule:
             last_dagrun = dr.get_previous_scheduled_dagrun(session=session)
         else:
diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py
index dff3e6ee9e..c02f700233 100644
--- a/airflow/timetables/base.py
+++ b/airflow/timetables/base.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 from typing import TYPE_CHECKING, Any, NamedTuple, Sequence
+from warnings import warn
 
 from pendulum import DateTime
 
@@ -122,11 +123,24 @@ class Timetable(Protocol):
     like ``schedule=None`` and ``"@once"`` set it to *False*.
     """
 
-    can_run: bool = True
-    """Whether this timetable can actually schedule runs.
+    _can_be_scheduled: bool = True
 
-    This defaults to and should generally be *True*, but ``NullTimetable`` sets
-    this to *False*.
+    @property
+    def can_be_scheduled(self):
+        if hasattr(self, "can_run"):
+            warn(
+                'can_run class variable is deprecated. Use "can_be_scheduled" 
instead.',
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            return self.can_run
+        return self._can_be_scheduled
+
+    """Whether this timetable can actually schedule runs in an automated 
manner.
+
+    This defaults to and should generally be *True* (including non periodic
+    execution types like *@once* and data triggered tables), but
+    ``NullTimetable`` sets this to *False*.
     """
 
     run_ordering: Sequence[str] = ("data_interval_end", "execution_date")
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index 1cfefe5a0f..53ddf6a7a8 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -34,7 +34,6 @@ class _TrivialTimetable(Timetable):
     """Some code reuse for "trivial" timetables that has nothing complex."""
 
     periodic = False
-    can_run = False
     run_ordering = ("execution_date",)
 
     @classmethod
@@ -63,6 +62,7 @@ class NullTimetable(_TrivialTimetable):
     This corresponds to ``schedule=None``.
     """
 
+    can_be_scheduled = False
     description: str = "Never, external triggers only"
 
     @property
@@ -144,7 +144,7 @@ class ContinuousTimetable(_TrivialTimetable):
         return DagRunInfo.interval(start, end)
 
 
-class DatasetTriggeredTimetable(NullTimetable):
+class DatasetTriggeredTimetable(_TrivialTimetable):
     """Timetable that never schedules anything.
 
     This should not be directly used anywhere, but only set if a DAG is 
triggered by datasets.
@@ -188,3 +188,11 @@ class DatasetTriggeredTimetable(NullTimetable):
             events, key=operator.attrgetter("source_dag_run.data_interval_end")
         ).source_dag_run.data_interval_end
         return DataInterval(start, end)
+
+    def next_dagrun_info(
+        self,
+        *,
+        last_automated_data_interval: DataInterval | None,
+        restriction: TimeRestriction,
+    ) -> DagRunInfo | None:
+        return None
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 8d28e01867..4a1820ba59 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3395,6 +3395,42 @@ class TestSchedulerJob:
             self.job_runner._send_sla_callbacks_to_processor(dag)
             scheduler_job.executor.callback_sink.send.assert_not_called()
 
+    @pytest.mark.parametrize(
+        "schedule, number_running, excepted",
+        [
+            (None, None, False),
+            ("*/1 * * * *", None, False),
+            ("*/1 * * * *", 1, True),
+        ],
+        ids=["no_dag_schedule", "dag_schedule_too_many_runs", 
"dag_schedule_less_runs"],
+    )
+    def test_should_update_dag_next_dagruns(self, schedule, number_running, 
excepted, session, dag_maker):
+        """Test if really required to update next dagrun or possible to save 
run time"""
+
+        with dag_maker(
+            dag_id="test_should_update_dag_next_dagruns", schedule=schedule, 
max_active_runs=2
+        ) as dag:
+            EmptyOperator(task_id="dummy")
+
+        dag_model = dag_maker.dag_model
+
+        for index in range(2):
+            dag_maker.create_dagrun(
+                run_id=f"run_{index}",
+                execution_date=(DEFAULT_DATE + timedelta(days=index)),
+                start_date=timezone.utcnow(),
+                state=State.RUNNING,
+                session=session,
+            )
+
+        session.flush()
+        scheduler_job = Job(executor=self.null_exec)
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        assert excepted is self.job_runner._should_update_dag_next_dagruns(
+            dag, dag_model, number_running, session=session
+        )
+
     def test_create_dag_runs(self, dag_maker):
         """
         Test various invariants of _create_dag_runs.

Reply via email to