This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d81fe093c2 Rename variables for dag runs (#34049)
d81fe093c2 is described below
commit d81fe093c266ef63d4e3f0189eb8c867bff865f4
Author: Daniel DylÄ…g <[email protected]>
AuthorDate: Tue Sep 5 09:21:46 2023 +0200
Rename variables for dag runs (#34049)
Co-authored-by: daniel.dylag <[email protected]>
---
airflow/models/dag.py | 38 +++++++++++++++++++-------------------
1 file changed, 19 insertions(+), 19 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index a45e430d1f..6be920f432 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2939,12 +2939,12 @@ class DAG(LoggingMixin):
session.add(orm_dag)
orm_dags.append(orm_dag)
- most_recent_runs: dict[str, DagRun] = {}
+ dag_id_to_last_automated_run: dict[str, DagRun] = {}
num_active_runs: dict[str, int] = {}
# Skip these queries entirely if no DAGs can be scheduled to save time.
if any(dag.timetable.can_be_scheduled for dag in dags):
- # Get the latest dag run for each existing dag as a single query
(avoid n+1 query)
- most_recent_subq = (
+ # Get the latest automated dag run for each existing dag as a
single query (avoid n+1 query)
+ last_automated_runs_subq = (
select(DagRun.dag_id,
func.max(DagRun.execution_date).label("max_execution_date"))
.where(
DagRun.dag_id.in_(existing_dags),
@@ -2953,13 +2953,13 @@ class DAG(LoggingMixin):
.group_by(DagRun.dag_id)
.subquery()
)
- most_recent_runs_iter = session.scalars(
+ last_automated_runs = session.scalars(
select(DagRun).where(
- DagRun.dag_id == most_recent_subq.c.dag_id,
- DagRun.execution_date ==
most_recent_subq.c.max_execution_date,
+ DagRun.dag_id == last_automated_runs_subq.c.dag_id,
+ DagRun.execution_date ==
last_automated_runs_subq.c.max_execution_date,
)
)
- most_recent_runs = {run.dag_id: run for run in
most_recent_runs_iter}
+ dag_id_to_last_automated_run = {run.dag_id: run for run in
last_automated_runs}
# Get number of active dagruns for all dags we are processing as a
single query.
num_active_runs =
DagRun.active_runs_of_dags(dag_ids=existing_dags, session=session)
@@ -2993,15 +2993,15 @@ class DAG(LoggingMixin):
orm_dag.timetable_description = dag.timetable.description
orm_dag.processor_subdir = processor_subdir
- run: DagRun | None = most_recent_runs.get(dag.dag_id)
- if run is None:
- data_interval = None
+ last_automated_run: DagRun | None =
dag_id_to_last_automated_run.get(dag.dag_id)
+ if last_automated_run is None:
+ last_automated_data_interval = None
else:
- data_interval = dag.get_run_data_interval(run)
+ last_automated_data_interval =
dag.get_run_data_interval(last_automated_run)
if num_active_runs.get(dag.dag_id, 0) >= orm_dag.max_active_runs:
orm_dag.next_dagrun_create_after = None
else:
- orm_dag.calculate_dagrun_date_fields(dag, data_interval)
+ orm_dag.calculate_dagrun_date_fields(dag,
last_automated_data_interval)
dag_tags = set(dag.tags or {})
orm_dag_tags = list(orm_dag.tags or [])
@@ -3667,27 +3667,27 @@ class DagModel(Base):
def calculate_dagrun_date_fields(
self,
dag: DAG,
- most_recent_dag_run: None | datetime | DataInterval,
+ last_automated_dag_run: None | datetime | DataInterval,
) -> None:
"""
Calculate ``next_dagrun`` and `next_dagrun_create_after``.
:param dag: The DAG object
- :param most_recent_dag_run: DataInterval (or datetime) of most recent
run of this dag, or none
+ :param last_automated_dag_run: DataInterval (or datetime) of most
recent run of this dag, or none
if not yet scheduled.
"""
- most_recent_data_interval: DataInterval | None
- if isinstance(most_recent_dag_run, datetime):
+ last_automated_data_interval: DataInterval | None
+ if isinstance(last_automated_dag_run, datetime):
warnings.warn(
"Passing a datetime to `DagModel.calculate_dagrun_date_fields`
is deprecated. "
"Provide a data interval instead.",
RemovedInAirflow3Warning,
stacklevel=2,
)
- most_recent_data_interval =
dag.infer_automated_data_interval(most_recent_dag_run)
+ last_automated_data_interval =
dag.infer_automated_data_interval(last_automated_dag_run)
else:
- most_recent_data_interval = most_recent_dag_run
- next_dagrun_info = dag.next_dagrun_info(most_recent_data_interval)
+ last_automated_data_interval = last_automated_dag_run
+ next_dagrun_info = dag.next_dagrun_info(last_automated_data_interval)
if next_dagrun_info is None:
self.next_dagrun_data_interval = self.next_dagrun =
self.next_dagrun_create_after = None
else: