This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ec18db1707 Optimize scheduler by skipping "non-scheduable" DAGs
(#30706)
ec18db1707 is described below
commit ec18db170745a8b1df0bb75569cd22e69892b3e2
Author: AutomationDev85 <[email protected]>
AuthorDate: Tue May 30 13:37:03 2023 +0200
Optimize scheduler by skipping "non-scheduable" DAGs (#30706)
Co-authored-by: Jens Scheffler <[email protected]>
---
airflow/api/common/mark_tasks.py | 4 ++--
airflow/jobs/backfill_job_runner.py | 2 +-
airflow/jobs/scheduler_job_runner.py | 25 ++++++++++++++++++-------
airflow/models/dag.py | 6 +++---
airflow/models/taskinstance.py | 2 +-
airflow/timetables/base.py | 22 ++++++++++++++++++----
airflow/timetables/simple.py | 12 ++++++++++--
tests/jobs/test_scheduler_job.py | 36 ++++++++++++++++++++++++++++++++++++
8 files changed, 89 insertions(+), 20 deletions(-)
diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index df6b87d5c2..7d2dd4a8db 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -305,7 +305,7 @@ def get_execution_dates(
else:
start_date = execution_date
start_date = execution_date if not past else start_date
- if not dag.timetable.can_run:
+ if not dag.timetable.can_be_scheduled:
# If the DAG never schedules, need to look at existing DagRun if the
user wants future or
# past runs.
dag_runs = dag.get_dagruns_between(start_date=start_date,
end_date=end_date)
@@ -337,7 +337,7 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, past:
bool, session: SASess
# determine run_id range of dag runs and tasks to consider
end_date = last_dagrun.logical_date if future else
current_dagrun.logical_date
start_date = current_dagrun.logical_date if not past else
first_dagrun.logical_date
- if not dag.timetable.can_run:
+ if not dag.timetable.can_be_scheduled:
# If the DAG never schedules, need to look at existing DagRun if the
user wants future or
# past runs.
dag_runs = dag.get_dagruns_between(start_date=start_date,
end_date=end_date, session=session)
diff --git a/airflow/jobs/backfill_job_runner.py
b/airflow/jobs/backfill_job_runner.py
index 28986b2bed..a44ebfa9de 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -317,7 +317,7 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
run_date = dagrun_info.logical_date
# consider max_active_runs but ignore when running subdags
- respect_dag_max_active_limit = bool(dag.timetable.can_run and not
dag.is_subdag)
+ respect_dag_max_active_limit = bool(dag.timetable.can_be_scheduled and
not dag.is_subdag)
current_active_dag_count =
dag.get_num_active_runs(external_trigger=False)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index da40d9eea7..e330e99ff5 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1178,7 +1178,9 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
creating_job_id=self.job.id,
)
active_runs_of_dags[dag.dag_id] += 1
- if self._should_update_dag_next_dagruns(dag, dag_model,
active_runs_of_dags[dag.dag_id]):
+ if self._should_update_dag_next_dagruns(
+ dag, dag_model, active_runs_of_dags[dag.dag_id],
session=session
+ ):
dag_model.calculate_dagrun_date_fields(dag, data_interval)
# TODO[HA]: Should we do a session.flush() so we don't have to keep
lots of state/object in
# memory for larger dags? or expunge_all()
@@ -1283,9 +1285,19 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
DatasetDagRunQueue.target_dag_id == dag_run.dag_id
).delete()
- def _should_update_dag_next_dagruns(self, dag: DAG, dag_model: DagModel,
total_active_runs: int) -> bool:
+ def _should_update_dag_next_dagruns(
+ self, dag: DAG, dag_model: DagModel, total_active_runs: int | None =
None, *, session: Session
+ ) -> bool:
"""Check if the dag's next_dagruns_create_after should be updated."""
- if total_active_runs >= dag.max_active_runs:
+ # If the DAG never schedules skip save runtime
+ if not dag.timetable.can_be_scheduled:
+ return False
+
+ # get active dag runs from DB if not available
+ if not total_active_runs:
+ total_active_runs = dag.get_num_active_runs(only_running=False,
session=session)
+
+ if total_active_runs and total_active_runs >= dag.max_active_runs:
self.log.info(
"DAG %s is at (or above) max_active_runs (%d of %d), not
creating any more runs",
dag_model.dag_id,
@@ -1398,9 +1410,8 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
session.merge(task_instance)
session.flush()
self.log.info("Run %s of %s has timed-out", dag_run.run_id,
dag_run.dag_id)
- active_runs = dag.get_num_active_runs(only_running=False,
session=session)
# Work out if we should allow creating a new DagRun now?
- if self._should_update_dag_next_dagruns(dag, dag_model,
active_runs):
+ if self._should_update_dag_next_dagruns(dag, dag_model,
session=session):
dag_model.calculate_dagrun_date_fields(dag,
dag.get_run_data_interval(dag_run))
callback_to_execute = DagCallbackRequest(
@@ -1427,10 +1438,10 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
return callback
# TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
schedulable_tis, callback_to_run =
dag_run.update_state(session=session, execute_callbacks=False)
+ # Check if DAG not scheduled then skip interval calculation to same
scheduler runtime
if dag_run.state in State.finished:
- active_runs = dag.get_num_active_runs(only_running=False,
session=session)
# Work out if we should allow creating a new DagRun now?
- if self._should_update_dag_next_dagruns(dag, dag_model,
active_runs):
+ if self._should_update_dag_next_dagruns(dag, dag_model,
session=session):
dag_model.calculate_dagrun_date_fields(dag,
dag.get_run_data_interval(dag_run))
# This will do one query per dag run. We "could" build up a complex
# query to update all the TIs across all the execution dates and dag
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 757ef6b6ed..5fabd203b8 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -876,7 +876,7 @@ class DAG(LoggingMixin):
DO NOT use this method is there is a known data interval.
"""
timetable_type = type(self.timetable)
- if issubclass(timetable_type, (NullTimetable, OnceTimetable)):
+ if issubclass(timetable_type, (NullTimetable, OnceTimetable,
DatasetTriggeredTimetable)):
return DataInterval.exact(timezone.coerce_datetime(logical_date))
start = timezone.coerce_datetime(logical_date)
if issubclass(timetable_type, CronDataIntervalTimetable):
@@ -1271,7 +1271,7 @@ class DAG(LoggingMixin):
@property
def allow_future_exec_dates(self) -> bool:
- return settings.ALLOW_FUTURE_EXEC_DATES and not self.timetable.can_run
+ return settings.ALLOW_FUTURE_EXEC_DATES and not
self.timetable.can_be_scheduled
@provide_session
def get_concurrency_reached(self, session=NEW_SESSION) -> bool:
@@ -3216,7 +3216,7 @@ class DAG(LoggingMixin):
Validates & raise exception if there are any Params in the DAG which
neither have a default value nor
have the null in schema['type'] list, but the DAG have a
schedule_interval which is not None.
"""
- if not self.timetable.can_run:
+ if not self.timetable.can_be_scheduled:
return
for k, v in self.params.items():
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index bb7175a4a3..9bbee50d8f 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -993,7 +993,7 @@ class TaskInstance(Base, LoggingMixin):
# or the DAG is never scheduled. For legacy reasons, when
# `catchup=True`, we use `get_previous_scheduled_dagrun` unless
# `ignore_schedule` is `True`.
- ignore_schedule = state is not None or not dag.timetable.can_run
+ ignore_schedule = state is not None or not
dag.timetable.can_be_scheduled
if dag.catchup is True and not ignore_schedule:
last_dagrun = dr.get_previous_scheduled_dagrun(session=session)
else:
diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py
index dff3e6ee9e..c02f700233 100644
--- a/airflow/timetables/base.py
+++ b/airflow/timetables/base.py
@@ -17,6 +17,7 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any, NamedTuple, Sequence
+from warnings import warn
from pendulum import DateTime
@@ -122,11 +123,24 @@ class Timetable(Protocol):
like ``schedule=None`` and ``"@once"`` set it to *False*.
"""
- can_run: bool = True
- """Whether this timetable can actually schedule runs.
+ _can_be_scheduled: bool = True
- This defaults to and should generally be *True*, but ``NullTimetable`` sets
- this to *False*.
+ @property
+ def can_be_scheduled(self):
+ if hasattr(self, "can_run"):
+ warn(
+ 'can_run class variable is deprecated. Use "can_be_scheduled"
instead.',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return self.can_run
+ return self._can_be_scheduled
+
+ """Whether this timetable can actually schedule runs in an automated
manner.
+
+ This defaults to and should generally be *True* (including non periodic
+ execution types like *@once* and data triggered tables), but
+ ``NullTimetable`` sets this to *False*.
"""
run_ordering: Sequence[str] = ("data_interval_end", "execution_date")
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index 1cfefe5a0f..53ddf6a7a8 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -34,7 +34,6 @@ class _TrivialTimetable(Timetable):
"""Some code reuse for "trivial" timetables that has nothing complex."""
periodic = False
- can_run = False
run_ordering = ("execution_date",)
@classmethod
@@ -63,6 +62,7 @@ class NullTimetable(_TrivialTimetable):
This corresponds to ``schedule=None``.
"""
+ can_be_scheduled = False
description: str = "Never, external triggers only"
@property
@@ -144,7 +144,7 @@ class ContinuousTimetable(_TrivialTimetable):
return DagRunInfo.interval(start, end)
-class DatasetTriggeredTimetable(NullTimetable):
+class DatasetTriggeredTimetable(_TrivialTimetable):
"""Timetable that never schedules anything.
This should not be directly used anywhere, but only set if a DAG is
triggered by datasets.
@@ -188,3 +188,11 @@ class DatasetTriggeredTimetable(NullTimetable):
events, key=operator.attrgetter("source_dag_run.data_interval_end")
).source_dag_run.data_interval_end
return DataInterval(start, end)
+
+ def next_dagrun_info(
+ self,
+ *,
+ last_automated_data_interval: DataInterval | None,
+ restriction: TimeRestriction,
+ ) -> DagRunInfo | None:
+ return None
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 8d28e01867..4a1820ba59 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3395,6 +3395,42 @@ class TestSchedulerJob:
self.job_runner._send_sla_callbacks_to_processor(dag)
scheduler_job.executor.callback_sink.send.assert_not_called()
+ @pytest.mark.parametrize(
+ "schedule, number_running, excepted",
+ [
+ (None, None, False),
+ ("*/1 * * * *", None, False),
+ ("*/1 * * * *", 1, True),
+ ],
+ ids=["no_dag_schedule", "dag_schedule_too_many_runs",
"dag_schedule_less_runs"],
+ )
+ def test_should_update_dag_next_dagruns(self, schedule, number_running,
excepted, session, dag_maker):
+ """Test if really required to update next dagrun or possible to save
run time"""
+
+ with dag_maker(
+ dag_id="test_should_update_dag_next_dagruns", schedule=schedule,
max_active_runs=2
+ ) as dag:
+ EmptyOperator(task_id="dummy")
+
+ dag_model = dag_maker.dag_model
+
+ for index in range(2):
+ dag_maker.create_dagrun(
+ run_id=f"run_{index}",
+ execution_date=(DEFAULT_DATE + timedelta(days=index)),
+ start_date=timezone.utcnow(),
+ state=State.RUNNING,
+ session=session,
+ )
+
+ session.flush()
+ scheduler_job = Job(executor=self.null_exec)
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+ assert excepted is self.job_runner._should_update_dag_next_dagruns(
+ dag, dag_model, number_running, session=session
+ )
+
def test_create_dag_runs(self, dag_maker):
"""
Test various invariants of _create_dag_runs.