uranusjr commented on code in PR #39603:
URL: https://github.com/apache/airflow/pull/39603#discussion_r1607251286


##########
tests/jobs/test_scheduler_job.py:
##########
@@ -3869,13 +3870,118 @@ def dict_from_obj(obj):
         assert created_run.data_interval_start == DEFAULT_DATE + 
timedelta(days=5)
         assert created_run.data_interval_end == DEFAULT_DATE + 
timedelta(days=11)
         # dag2 DDRQ record should still be there since the dag run was *not* 
triggered
-        assert 
session.query(DatasetDagRunQueue).filter_by(target_dag_id=dag2.dag_id).one() is 
not None
+        assert 
session.query(DatasetDagRunQueue).filter_by(target_dag_id=dag3.dag_id).one() is 
not None
         # dag2 should not be triggered since it depends on both dataset 1  and 
2
-        assert session.query(DagRun).filter(DagRun.dag_id == 
dag2.dag_id).one_or_none() is None
+        assert session.query(DagRun).filter(DagRun.dag_id == 
dag3.dag_id).one_or_none() is None
         # dag3 DDRQ record should be deleted since the dag run was triggered
-        assert 
session.query(DatasetDagRunQueue).filter_by(target_dag_id=dag3.dag_id).one_or_none()
 is None
+        assert 
session.query(DatasetDagRunQueue).filter_by(target_dag_id=dag2.dag_id).one_or_none()
 is None
+
+        assert dag2.get_last_dagrun().creating_job_id == scheduler_job.id
+
+    @pytest.mark.need_serialized_dag
+    def test_new_dagrun_ignores_old_dataset_events(self, session, dag_maker):
+        """
+        Test various invariants of _create_dag_runs.
 
-        assert dag3.get_last_dagrun().creating_job_id == scheduler_job.id
+        - That the new DAG should not get dataset events which has timestamp 
with before dag creation date.
+        - That the run created is on QUEUED State
+        - That dag_model has next_dagrun
+        """
+
+        dataset = Dataset(uri="ds")
+
+        with dag_maker(dag_id="datasets-1", start_date=timezone.utcnow(), 
session=session):
+            BashOperator(task_id="task", bash_command="echo 1", 
outlets=[dataset])
+        dr = dag_maker.create_dagrun(
+            run_id="run1",
+            execution_date=(DEFAULT_DATE + timedelta(days=100)),
+            data_interval=(DEFAULT_DATE + timedelta(days=10), DEFAULT_DATE + 
timedelta(days=11)),
+        )
+
+        ds_id = 
session.query(DatasetModel.id).filter_by(uri=dataset.uri).scalar()
+
+        event1 = DatasetEvent(
+            dataset_id=ds_id,
+            source_task_id="task",
+            source_dag_id=dr.dag_id,
+            source_run_id=dr.run_id,
+            source_map_index=-1,
+        )
+        session.add(event1)
+
+        # Create a second event, creation time is more recent, but data 
interval is older
+        dr = dag_maker.create_dagrun(
+            run_id="run2",
+            execution_date=(DEFAULT_DATE + timedelta(days=101)),
+            data_interval=(DEFAULT_DATE + timedelta(days=5), DEFAULT_DATE + 
timedelta(days=6)),
+        )
+
+        event2 = DatasetEvent(
+            dataset_id=ds_id,
+            source_task_id="task",
+            source_dag_id=dr.dag_id,
+            source_run_id=dr.run_id,
+            source_map_index=-1,
+        )
+        session.add(event2)
+
+        # Create DAG after dataset events.
+        with dag_maker(dag_id="datasets-consumer", schedule=[dataset]):
+            pass
+        dag = dag_maker.dag
+
+        with dag_maker(dag_id="datasets-1-new", start_date=timezone.utcnow(), 
session=session):
+            BashOperator(task_id="task", bash_command="echo 1", 
outlets=[dataset])
+        dr = dag_maker.create_dagrun(
+            run_id="run3",
+            execution_date=(DEFAULT_DATE + timedelta(days=101)),
+            data_interval=(DEFAULT_DATE + timedelta(days=5), DEFAULT_DATE + 
timedelta(days=6)),
+        )
+
+        event3 = DatasetEvent(
+            dataset_id=ds_id,
+            source_task_id="task",
+            source_dag_id=dr.dag_id,
+            source_run_id=dr.run_id,
+            source_map_index=-1,
+            timestamp=timezone.utcnow(),
+        )
+        session.add(event3)
+
+        session = dag_maker.session
+        session.add_all(
+            [
+                DatasetDagRunQueue(dataset_id=ds_id, target_dag_id=dag.dag_id),
+            ]
+        )
+        session.flush()
+
+        scheduler_job = Job(executor=self.null_exec)
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        self.job_runner.processor_agent = mock.MagicMock()
+
+        with create_session() as session:
+            self.job_runner._create_dagruns_for_dags(session, session)
+
+        def dict_from_obj(obj):
+            """Get dict of column attrs from SqlAlchemy object."""
+            return {k.key: obj.__dict__.get(k) for k in 
obj.__mapper__.column_attrs}

Review Comment:
   ```suggestion
               return {k.key: getattr(obj, k) for k in 
obj.__mapper__.column_attrs}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to