tosheer commented on code in PR #39603:
URL: https://github.com/apache/airflow/pull/39603#discussion_r1634613379
##########
tests/jobs/test_scheduler_job.py:
##########
@@ -3869,13 +3870,112 @@ def dict_from_obj(obj):
assert created_run.data_interval_start == DEFAULT_DATE +
timedelta(days=5)
assert created_run.data_interval_end == DEFAULT_DATE +
timedelta(days=11)
# dag2 DDRQ record should still be there since the dag run was *not*
triggered
- assert
session.query(DatasetDagRunQueue).filter_by(target_dag_id=dag2.dag_id).one() is
not None
+ assert
session.query(DatasetDagRunQueue).filter_by(target_dag_id=dag3.dag_id).one() is
not None
# dag2 should not be triggered since it depends on both dataset 1 and
2
- assert session.query(DagRun).filter(DagRun.dag_id ==
dag2.dag_id).one_or_none() is None
+ assert session.query(DagRun).filter(DagRun.dag_id ==
dag3.dag_id).one_or_none() is None
# dag3 DDRQ record should be deleted since the dag run was triggered
- assert
session.query(DatasetDagRunQueue).filter_by(target_dag_id=dag3.dag_id).one_or_none()
is None
+ assert
session.query(DatasetDagRunQueue).filter_by(target_dag_id=dag2.dag_id).one_or_none()
is None
+
+ assert dag2.get_last_dagrun().creating_job_id == scheduler_job.id
+
+ @pytest.mark.need_serialized_dag
+ def test_new_dagrun_ignores_old_dataset_events(self, session, dag_maker):
+ """
+ Test various invariants of _create_dag_runs.
+
+ - That the new DAG should not get dataset events which has timestamp
with before dag creation date.
+ - That the run created is on QUEUED State
+ - That dag_model has next_dagrun
+ """
+
+ dataset = Dataset(uri="ds")
+
+ with dag_maker(dag_id="datasets-1", start_date=timezone.utcnow(),
session=session):
+ BashOperator(task_id="task", bash_command="echo 1",
outlets=[dataset])
+ dr = dag_maker.create_dagrun(
+ run_id="run1",
+ execution_date=(DEFAULT_DATE + timedelta(days=100)),
+ data_interval=(DEFAULT_DATE + timedelta(days=10), DEFAULT_DATE +
timedelta(days=11)),
+ )
+
+ ds_id =
session.query(DatasetModel.id).filter_by(uri=dataset.uri).scalar()
+
+ event1 = DatasetEvent(
+ dataset_id=ds_id,
+ source_task_id="task",
+ source_dag_id=dr.dag_id,
+ source_run_id=dr.run_id,
+ source_map_index=-1,
+ )
+ session.add(event1)
+
+ # Create a second event, creation time is more recent, but data
interval is older
+ dr = dag_maker.create_dagrun(
+ run_id="run2",
+ execution_date=(DEFAULT_DATE + timedelta(days=101)),
+ data_interval=(DEFAULT_DATE + timedelta(days=5), DEFAULT_DATE +
timedelta(days=6)),
+ )
+
+ event2 = DatasetEvent(
+ dataset_id=ds_id,
+ source_task_id="task",
+ source_dag_id=dr.dag_id,
+ source_run_id=dr.run_id,
+ source_map_index=-1,
+ )
+ session.add(event2)
+
+ # Create DAG after dataset events.
+ with dag_maker(dag_id="datasets-consumer", schedule=[dataset]):
+ pass
+ dag = dag_maker.dag
Review Comment:
That makes sense. Update is done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]