This is an automated email from the ASF dual-hosted git repository. uranusjr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new d81fe093c2 Rename variables for dag runs (#34049) d81fe093c2 is described below commit d81fe093c266ef63d4e3f0189eb8c867bff865f4 Author: Daniel DylÄ…g <bi...@users.noreply.github.com> AuthorDate: Tue Sep 5 09:21:46 2023 +0200 Rename variables for dag runs (#34049) Co-authored-by: daniel.dylag <danieldylag1...@gmail.com> --- airflow/models/dag.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index a45e430d1f..6be920f432 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2939,12 +2939,12 @@ class DAG(LoggingMixin): session.add(orm_dag) orm_dags.append(orm_dag) - most_recent_runs: dict[str, DagRun] = {} + dag_id_to_last_automated_run: dict[str, DagRun] = {} num_active_runs: dict[str, int] = {} # Skip these queries entirely if no DAGs can be scheduled to save time. if any(dag.timetable.can_be_scheduled for dag in dags): - # Get the latest dag run for each existing dag as a single query (avoid n+1 query) - most_recent_subq = ( + # Get the latest automated dag run for each existing dag as a single query (avoid n+1 query) + last_automated_runs_subq = ( select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date")) .where( DagRun.dag_id.in_(existing_dags), @@ -2953,13 +2953,13 @@ class DAG(LoggingMixin): .group_by(DagRun.dag_id) .subquery() ) - most_recent_runs_iter = session.scalars( + last_automated_runs = session.scalars( select(DagRun).where( - DagRun.dag_id == most_recent_subq.c.dag_id, - DagRun.execution_date == most_recent_subq.c.max_execution_date, + DagRun.dag_id == last_automated_runs_subq.c.dag_id, + DagRun.execution_date == last_automated_runs_subq.c.max_execution_date, ) ) - most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter} + dag_id_to_last_automated_run = {run.dag_id: run for run in last_automated_runs} # Get number of active dagruns for all dags we are processing as a single query. num_active_runs = DagRun.active_runs_of_dags(dag_ids=existing_dags, session=session) @@ -2993,15 +2993,15 @@ class DAG(LoggingMixin): orm_dag.timetable_description = dag.timetable.description orm_dag.processor_subdir = processor_subdir - run: DagRun | None = most_recent_runs.get(dag.dag_id) - if run is None: - data_interval = None + last_automated_run: DagRun | None = dag_id_to_last_automated_run.get(dag.dag_id) + if last_automated_run is None: + last_automated_data_interval = None else: - data_interval = dag.get_run_data_interval(run) + last_automated_data_interval = dag.get_run_data_interval(last_automated_run) if num_active_runs.get(dag.dag_id, 0) >= orm_dag.max_active_runs: orm_dag.next_dagrun_create_after = None else: - orm_dag.calculate_dagrun_date_fields(dag, data_interval) + orm_dag.calculate_dagrun_date_fields(dag, last_automated_data_interval) dag_tags = set(dag.tags or {}) orm_dag_tags = list(orm_dag.tags or []) @@ -3667,27 +3667,27 @@ class DagModel(Base): def calculate_dagrun_date_fields( self, dag: DAG, - most_recent_dag_run: None | datetime | DataInterval, + last_automated_dag_run: None | datetime | DataInterval, ) -> None: """ Calculate ``next_dagrun`` and `next_dagrun_create_after``. :param dag: The DAG object - :param most_recent_dag_run: DataInterval (or datetime) of most recent run of this dag, or none + :param last_automated_dag_run: DataInterval (or datetime) of most recent run of this dag, or none if not yet scheduled. """ - most_recent_data_interval: DataInterval | None - if isinstance(most_recent_dag_run, datetime): + last_automated_data_interval: DataInterval | None + if isinstance(last_automated_dag_run, datetime): warnings.warn( "Passing a datetime to `DagModel.calculate_dagrun_date_fields` is deprecated. " "Provide a data interval instead.", RemovedInAirflow3Warning, stacklevel=2, ) - most_recent_data_interval = dag.infer_automated_data_interval(most_recent_dag_run) + last_automated_data_interval = dag.infer_automated_data_interval(last_automated_dag_run) else: - most_recent_data_interval = most_recent_dag_run - next_dagrun_info = dag.next_dagrun_info(most_recent_data_interval) + last_automated_data_interval = last_automated_dag_run + next_dagrun_info = dag.next_dagrun_info(last_automated_data_interval) if next_dagrun_info is None: self.next_dagrun_data_interval = self.next_dagrun = self.next_dagrun_create_after = None else: