This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-4-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c642e1a8373173355013700aa614c87202b2e47f Author: Jed Cunningham <[email protected]> AuthorDate: Mon Sep 12 01:05:28 2022 -0700 Fix `dags_needing_dagruns` dataset info timestamp (#26288) (cherry picked from commit 3c9c0f940b67c25285259541478ebb413b94a73a) --- airflow/models/dag.py | 8 +++----- tests/models/test_dag.py | 48 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index e761352c8c..be400f9427 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -3238,15 +3238,13 @@ class DagModel(Base): you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked. """ - from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue as DDRQ - # these dag ids are triggered by datasets, and they are ready to go. dataset_triggered_dag_info_list = { - x.dag_id: (x.first_event_time, x.last_event_time) + x.dag_id: (x.first_queued_time, x.last_queued_time) for x in session.query( DagScheduleDatasetReference.dag_id, - func.max(DDRQ.created_at).label('last_event_time'), - func.max(DDRQ.created_at).label('first_event_time'), + func.max(DDRQ.created_at).label('last_queued_time'), + func.min(DDRQ.created_at).label('first_queued_time'), ) .join(DagScheduleDatasetReference.queue_records, isouter=True) .group_by(DagScheduleDatasetReference.dag_id) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index f0f53db8fa..0070c54c38 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -45,7 +45,7 @@ from airflow.exceptions import AirflowException, DuplicateTaskIdFound, ParamVali from airflow.models import DAG, DagModel, DagRun, DagTag, TaskFail, TaskInstance as TI from airflow.models.baseoperator import BaseOperator from airflow.models.dag import DagOwnerAttributes, dag as dag_decorator, get_dataset_triggered_next_run_info -from airflow.models.dataset import DatasetDagRunQueue, DatasetModel, TaskOutletDatasetReference +from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel, TaskOutletDatasetReference from airflow.models.param import DagParam, Param, ParamsDict from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator @@ -2220,6 +2220,52 @@ class TestDagModel: assert dag.relative_fileloc == expected_relative + @pytest.mark.need_serialized_dag + def test_dags_needing_dagruns_dataset_triggered_dag_info_queued_times(self, session, dag_maker): + dataset1 = Dataset(uri="ds1") + dataset2 = Dataset(uri="ds2") + + for dag_id, dataset in [("datasets-1", dataset1), ("datasets-2", dataset2)]: + with dag_maker(dag_id=dag_id, start_date=timezone.utcnow(), session=session): + EmptyOperator(task_id="task", outlets=[dataset]) + dr = dag_maker.create_dagrun() + + ds_id = session.query(DatasetModel.id).filter_by(uri=dataset.uri).scalar() + + session.add( + DatasetEvent( + dataset_id=ds_id, + source_task_id="task", + source_dag_id=dr.dag_id, + source_run_id=dr.run_id, + source_map_index=-1, + ) + ) + + ds1_id = session.query(DatasetModel.id).filter_by(uri=dataset1.uri).scalar() + ds2_id = session.query(DatasetModel.id).filter_by(uri=dataset2.uri).scalar() + + with dag_maker(dag_id="datasets-consumer-multiple", schedule=[dataset1, dataset2]) as dag: + pass + + session.flush() + session.add_all( + [ + DatasetDagRunQueue(dataset_id=ds1_id, target_dag_id=dag.dag_id, created_at=DEFAULT_DATE), + DatasetDagRunQueue( + dataset_id=ds2_id, target_dag_id=dag.dag_id, created_at=DEFAULT_DATE + timedelta(hours=1) + ), + ] + ) + session.flush() + + query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) + assert 1 == len(dataset_triggered_dag_info) + assert dag.dag_id in dataset_triggered_dag_info + first_queued_time, last_queued_time = dataset_triggered_dag_info[dag.dag_id] + assert first_queued_time == DEFAULT_DATE + assert last_queued_time == DEFAULT_DATE + timedelta(hours=1) + class TestQueries: def setup_method(self) -> None:
