This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ba20d06b5e Clean up typing with max_execution_date query builder
(#36958)
ba20d06b5e is described below
commit ba20d06b5e8ae47cf684306c55dbef14c4e791f1
Author: Daniel Standish <[email protected]>
AuthorDate: Mon Jan 22 13:41:45 2024 -0800
Clean up typing with max_execution_date query builder (#36958)
---
airflow/models/dag.py | 11 +++++------
tests/models/test_dag.py | 4 ++--
2 files changed, 7 insertions(+), 8 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index dac7be010a..92e1a8945f 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -3049,8 +3049,8 @@ class DAG(LoggingMixin):
)
query = with_row_locks(query, of=DagModel, session=session)
orm_dags: list[DagModel] = session.scalars(query).unique().all()
- existing_dags = {orm_dag.dag_id: orm_dag for orm_dag in orm_dags}
- missing_dag_ids = dag_ids.difference(existing_dags)
+ existing_dags: dict[str, DagModel] = {x.dag_id: x for x in orm_dags}
+ missing_dag_ids = dag_ids.difference(existing_dags.keys())
for missing_dag_id in missing_dag_ids:
orm_dag = DagModel(dag_id=missing_dag_id)
@@ -3067,7 +3067,7 @@ class DAG(LoggingMixin):
# Skip these queries entirely if no DAGs can be scheduled to save time.
if any(dag.timetable.can_be_scheduled for dag in dags):
# Get the latest automated dag run for each existing dag as a
single query (avoid n+1 query)
- query = cls._get_latest_runs_query(existing_dags, session)
+ query = cls._get_latest_runs_query(dags=list(existing_dags.keys()))
latest_runs = {run.dag_id: run for run in session.scalars(query)}
# Get number of active dagruns for all dags we are processing as a
single query.
@@ -3240,16 +3240,15 @@ class DAG(LoggingMixin):
cls.bulk_write_to_db(dag.subdags,
processor_subdir=processor_subdir, session=session)
@classmethod
- def _get_latest_runs_query(cls, dags, session) -> Query:
+ def _get_latest_runs_query(cls, dags: list[str]) -> Query:
"""
Query the database to retrieve the last automated run for each dag.
:param dags: dags to query
- :param session: sqlalchemy session object
"""
if len(dags) == 1:
# Index optimized fast path to avoid more complicated & slower
groupby queryplan
- existing_dag_id = list(dags)[0].dag_id
+ existing_dag_id = dags[0]
last_automated_runs_subq = (
select(func.max(DagRun.execution_date).label("max_execution_date"))
.where(
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 7c337ed965..1f70ba051a 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -4140,7 +4140,7 @@ class TestTaskClearingSetupTeardownBehavior:
def test_get_latest_runs_query_one_dag(dag_maker, session):
with dag_maker(dag_id="dag1") as dag1:
...
- query = DAG._get_latest_runs_query(dags=[dag1], session=session)
+ query = DAG._get_latest_runs_query(dags=[dag1.dag_id])
actual = [x.strip() for x in str(query.compile()).splitlines()]
expected = [
"SELECT dag_run.id, dag_run.dag_id, dag_run.execution_date,
dag_run.data_interval_start, dag_run.data_interval_end",
@@ -4157,7 +4157,7 @@ def test_get_latest_runs_query_two_dags(dag_maker,
session):
...
with dag_maker(dag_id="dag2") as dag2:
...
- query = DAG._get_latest_runs_query(dags=[dag1, dag2], session=session)
+ query = DAG._get_latest_runs_query(dags=[dag1.dag_id, dag2.dag_id])
actual = [x.strip() for x in str(query.compile()).splitlines()]
print("\n".join(actual))
expected = [