jedcunningham commented on code in PR #25080:
URL: https://github.com/apache/airflow/pull/25080#discussion_r923448182


##########
tests/api_connexion/endpoints/test_dag_run_endpoint.py:
##########
@@ -1483,3 +1488,180 @@ def test_should_respond_404(self):
             environ_overrides={"REMOTE_USER": "test"},
         )
         assert response.status_code == 404
+
+
+def test__get_upstream_dataset_events_no_prior(configured_app):
+    """If no prior dag runs, return all events"""
+    from airflow.api_connexion.endpoints.dag_run_endpoint import 
_get_upstream_dataset_events
+
+    # setup dags and datasets
+    unique_id = str(uuid4())
+    session = settings.Session()
+    dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+    dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+    dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, 
dataset1b])
+    DAG.bulk_write_to_db(dags=[dag2], session=session)
+    session.add_all([dataset1a, dataset1b])
+    session.commit()
+
+    # add 5 events
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id), 
DatasetEvent(dataset_id=dataset1b.id)])
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id), 
DatasetEvent(dataset_id=dataset1b.id)])
+    session.add_all([DatasetEvent(dataset_id=dataset1a.id)])
+    session.commit()
+
+    # create a single dag run, no prior dag runs
+    dr = DagRun(dag2.dag_id, run_id=unique_id, 
run_type=DagRunType.DATASET_TRIGGERED)
+    dr.dag = dag2
+    session.add(dr)
+    session.commit()
+    session.expunge_all()
+
+    # check result
+    events = _get_upstream_dataset_events(dag_run=dr, session=session)
+    assert len(events) == 5
+
+
+def test__get_upstream_dataset_events_with_prior(configured_app):
+    """
+    Events returned should be those that occurred after last DATASET_TRIGGERED
+    dag run and up to the exec date of current dag run.
+    """
+    from airflow.api_connexion.endpoints.dag_run_endpoint import 
_get_upstream_dataset_events
+
+    # setup dags and datasets
+    unique_id = str(uuid4())
+    session = settings.Session()
+    dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+    dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+    dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, 
dataset1b])
+    DAG.bulk_write_to_db(dags=[dag2], session=session)
+    session.add_all([dataset1a, dataset1b])
+    session.commit()
+
+    # add 2 events, then a dag run, then 3 events, then another dag run then 
another event
+    first_timestamp = pendulum.now('UTC')
+    session.add_all(
+        [
+            DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp),
+            DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp),
+        ]
+    )
+    dr = DagRun(
+        dag2.dag_id,
+        run_id=unique_id + '-1',
+        run_type=DagRunType.DATASET_TRIGGERED,
+        execution_date=first_timestamp.add(microseconds=1000),
+    )
+    dr.dag = dag2
+    session.add(dr)
+    session.add_all(
+        [
+            DatasetEvent(dataset_id=dataset1a.id, 
created_at=first_timestamp.add(microseconds=2000)),
+            DatasetEvent(dataset_id=dataset1b.id, 
created_at=first_timestamp.add(microseconds=3000)),
+            DatasetEvent(dataset_id=dataset1b.id, 
created_at=first_timestamp.add(microseconds=4000)),
+        ]
+    )
+    dr = DagRun(
+        dag2.dag_id,
+        run_id=unique_id + '-2',
+        run_type=DagRunType.DATASET_TRIGGERED,
+        execution_date=first_timestamp.add(microseconds=4000),  # exact same 
time as 3rd event in window
+    )
+    dr.dag = dag2
+    session.add(dr)
+    dr = DagRun(  # this dag run should be ignored
+        dag2.dag_id,
+        run_id=unique_id + '-3',
+        run_type=DagRunType.MANUAL,
+        execution_date=first_timestamp.add(microseconds=3000),
+    )
+    dr.dag = dag2
+    session.add(dr)
+    session.add_all(
+        [DatasetEvent(dataset_id=dataset1a.id, 
created_at=first_timestamp.add(microseconds=5000))]
+    )
+    session.commit()
+    session.expunge_all()
+
+    # check result
+    events = _get_upstream_dataset_events(dag_run=dr, session=session)
+
+    event_times = [x.created_at for x in events]
+    assert event_times == [
+        first_timestamp.add(microseconds=2000),
+        first_timestamp.add(microseconds=3000),
+        first_timestamp.add(microseconds=4000),
+    ]
+
+
+class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
+    
@mock.patch('airflow.api_connexion.endpoints.dag_run_endpoint._get_upstream_dataset_events')
+    def test_should_respond_200(self, mock_get_events, session):
+        dagrun_model = DagRun(
+            dag_id="TEST_DAG_ID",
+            run_id="TEST_DAG_RUN_ID",
+            run_type=DagRunType.DATASET_TRIGGERED,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+            state=DagRunState.RUNNING,
+        )
+        session.add(dagrun_model)
+        session.commit()
+        result = session.query(DagRun).all()
+        assert len(result) == 1
+        created_at = pendulum.now('UTC')
+        # make sure whatever is returned by this func is what comes out in 
response.
+        mock_get_events.return_value = [DatasetEvent(dataset_id=1, 
created_at=created_at)]
+        response = self.client.get(
+            
"api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstream-dataset-events",
+            environ_overrides={'REMOTE_USER': "test"},
+        )
+        assert response.status_code == 200
+        expected_response = {
+            'dataset_events': [
+                {
+                    'created_at': str(created_at),
+                    'dataset_id': 1,
+                    'extra': None,
+                    'id': None,
+                    'source_dag_id': None,
+                    'source_map_index': None,
+                    'source_run_id': None,
+                    'source_task_id': None,
+                }
+            ],
+            'total_entries': 1,
+        }
+        assert response.json == expected_response

Review Comment:
   That's fair, I overlooked the fact that id, run_id, etc were all None.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to