This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9ec87539ae Split next_dagruns_to_examine function into two (#42386)
9ec87539ae is described below
commit 9ec87539ae75a5f5e28d8d409eaa224788030c3b
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Sep 24 10:30:05 2024 -0700
Split next_dagruns_to_examine function into two (#42386)
The behavior is different enough to merit two different functions. In fact
I noticed that we actually are using a bad index hint for the QUEUED case. And
this becomes more apparent with introduction of backfill handling into
scheduler, which is forthcoming.
---
airflow/jobs/scheduler_job_runner.py | 10 ++---
airflow/models/dagrun.py | 84 +++++++++++++++++++++++++-----------
tests/jobs/test_scheduler_job.py | 4 +-
tests/models/test_dagrun.py | 8 +++-
4 files changed, 71 insertions(+), 35 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index eb0abbc296..a49c2361ec 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1218,9 +1218,10 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self._start_queued_dagruns(session)
guard.commit()
- dag_runs = self._get_next_dagruns_to_examine(DagRunState.RUNNING,
session)
+
# Bulk fetch the currently active dag runs for the dags we are
# examining, rather than making one query per DagRun
+ dag_runs = DagRun.get_running_dag_runs_to_examine(session=session)
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs,
session)
@@ -1274,11 +1275,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
return num_queued_tis
- @retry_db_transaction
- def _get_next_dagruns_to_examine(self, state: DagRunState, session:
Session) -> Query:
- """Get Next DagRuns to Examine with retries."""
- return DagRun.next_dagruns_to_examine(state, session)
-
@retry_db_transaction
def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session:
Session) -> None:
"""Find Dag Models needing DagRuns and Create Dag Runs with retries in
case of OperationalError."""
@@ -1512,7 +1508,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
def _start_queued_dagruns(self, session: Session) -> None:
"""Find DagRuns in queued state and decide moving them to running
state."""
# added all() to save runtime, otherwise query is executed more than
once
- dag_runs: Collection[DagRun] =
self._get_next_dagruns_to_examine(DagRunState.QUEUED, session).all()
+ dag_runs: Collection[DagRun] =
DagRun.get_queued_dag_runs_to_set_running(session).all()
active_runs_of_dags = Counter(
DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs),
only_running=True, session=session),
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index c932958861..3ef1c18f15 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -67,6 +67,7 @@ from airflow.utils import timezone
from airflow.utils.dates import datetime_to_nano
from airflow.utils.helpers import chunks, is_container, prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first,
tuple_in_condition, with_row_locks
from airflow.utils.state import DagRunState, State, TaskInstanceState
@@ -388,12 +389,8 @@ class DagRun(Base, LoggingMixin):
return dict(iter(session.execute(query)))
@classmethod
- def next_dagruns_to_examine(
- cls,
- state: DagRunState,
- session: Session,
- max_number: int | None = None,
- ) -> Query:
+ @retry_db_transaction
+ def get_running_dag_runs_to_examine(cls, session: Session) -> Query:
"""
Return the next DagRuns that the scheduler should attempt to schedule.
@@ -401,42 +398,79 @@ class DagRun(Base, LoggingMixin):
query, you should ensure that any scheduling decisions are made in a
single transaction -- as soon as
the transaction is committed it will be unlocked.
+ :meta private:
"""
from airflow.models.dag import DagModel
- if max_number is None:
- max_number = cls.DEFAULT_DAGRUNS_TO_EXAMINE
-
- # TODO: Bake this query, it is run _A lot_
query = (
select(cls)
.with_hint(cls, "USE INDEX (idx_dag_run_running_dags)",
dialect_name="mysql")
- .where(cls.state == state, cls.run_type != DagRunType.BACKFILL_JOB)
+ .where(cls.state == DagRunState.RUNNING, cls.run_type !=
DagRunType.BACKFILL_JOB)
.join(DagModel, DagModel.dag_id == cls.dag_id)
.where(DagModel.is_paused == false(), DagModel.is_active == true())
+ .order_by(
+ nulls_first(cls.last_scheduling_decision, session=session),
+ cls.execution_date,
+ )
)
- if state == DagRunState.QUEUED:
- # For dag runs in the queued state, we check if they have reached
the max_active_runs limit
- # and if so we drop them
- running_drs = (
- select(DagRun.dag_id,
func.count(DagRun.state).label("num_running"))
- .where(DagRun.state == DagRunState.RUNNING)
- .group_by(DagRun.dag_id)
- .subquery()
+
+ if not settings.ALLOW_FUTURE_EXEC_DATES:
+ query = query.where(DagRun.execution_date <= func.now())
+
+ return session.scalars(
+ with_row_locks(
+ query.limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE),
+ of=cls,
+ session=session,
+ skip_locked=True,
)
- query = query.outerjoin(running_drs, running_drs.c.dag_id ==
DagRun.dag_id).where(
- func.coalesce(running_drs.c.num_running, 0) <
DagModel.max_active_runs
+ )
+
+ @classmethod
+ @retry_db_transaction
+ def get_queued_dag_runs_to_set_running(cls, session: Session) -> Query:
+ """
+ Return the next queued DagRuns that the scheduler should attempt to
schedule.
+
+ This will return zero or more DagRun rows that are row-level-locked
with a "SELECT ... FOR UPDATE"
+ query, you should ensure that any scheduling decisions are made in a
single transaction -- as soon as
+ the transaction is committed it will be unlocked.
+
+ :meta private:
+ """
+ from airflow.models.dag import DagModel
+
+ # For dag runs in the queued state, we check if they have reached the
max_active_runs limit
+ # and if so we drop them
+ running_drs = (
+ select(DagRun.dag_id,
func.count(DagRun.state).label("num_running"))
+ .where(DagRun.state == DagRunState.RUNNING)
+ .group_by(DagRun.dag_id)
+ .subquery()
+ )
+ query = (
+ select(cls)
+ .where(cls.state == DagRunState.QUEUED, cls.run_type !=
DagRunType.BACKFILL_JOB)
+ .join(DagModel, DagModel.dag_id == cls.dag_id)
+ .where(DagModel.is_paused == false(), DagModel.is_active == true())
+ .outerjoin(running_drs, running_drs.c.dag_id == DagRun.dag_id)
+ .where(func.coalesce(running_drs.c.num_running, 0) <
DagModel.max_active_runs)
+ .order_by(
+ nulls_first(cls.last_scheduling_decision, session=session),
+ cls.execution_date,
)
- query = query.order_by(
- nulls_first(cls.last_scheduling_decision, session=session),
- cls.execution_date,
)
if not settings.ALLOW_FUTURE_EXEC_DATES:
query = query.where(DagRun.execution_date <= func.now())
return session.scalars(
- with_row_locks(query.limit(max_number), of=cls, session=session,
skip_locked=True)
+ with_row_locks(
+ query.limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE),
+ of=cls,
+ session=session,
+ skip_locked=True,
+ )
)
@classmethod
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 9113a2dee1..4067f4fa17 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -6036,7 +6036,9 @@ class TestSchedulerJobQueriesCount:
self.job_runner.processor_agent = mock_agent
with assert_queries_count(expected_query_count, margin=15):
- with mock.patch.object(DagRun, "next_dagruns_to_examine") as
mock_dagruns:
+ with mock.patch.object(
+ DagRun, DagRun.get_running_dag_runs_to_examine.__name__
+ ) as mock_dagruns:
query = MagicMock()
query.all.return_value = dagruns
mock_dagruns.return_value = query
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index f72f3b2b79..d2f70ce693 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -931,14 +931,18 @@ class TestDagRun:
**triggered_by_kwargs,
)
- runs = DagRun.next_dagruns_to_examine(state, session).all()
+ if state == DagRunState.RUNNING:
+ func = DagRun.get_running_dag_runs_to_examine
+ else:
+ func = DagRun.get_queued_dag_runs_to_set_running
+ runs = func(session).all()
assert runs == [dr]
orm_dag.is_paused = True
session.flush()
- runs = DagRun.next_dagruns_to_examine(state, session).all()
+ runs = func(session).all()
assert runs == []
@mock.patch.object(Stats, "timing")