jedcunningham commented on code in PR #25080:
URL: https://github.com/apache/airflow/pull/25080#discussion_r922445052
##########
airflow/api_connexion/endpoints/dag_run_endpoint.py:
##########
@@ -86,6 +91,68 @@ def get_dag_run(*, dag_id: str, dag_run_id: str, session:
Session = NEW_SESSION)
return dagrun_schema.dump(dag_run)
[email protected]_access(
+ [
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
+ ],
+)
+@provide_session
+def get_upstream_dataset_events(
+ *, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION
+) -> APIResponse:
+ """Get a DAG Run."""
+ dag_run: Optional[DagRun] = (
+ session.query(DagRun)
+ .filter(
+ DagRun.dag_id == dag_id,
+ DagRun.run_id == dag_run_id,
+ )
+ .one_or_none()
+ )
+ if dag_run is None:
+ raise NotFound(
+ "DAGRun not found",
+ detail=f"DAGRun with DAG ID: '{dag_id}' and DagRun ID:
'{dag_run_id}' not found",
+ )
+ events = _get_upstream_dataset_events(dag_run=dag_run, session=session)
+ return dataset_event_collection_schema.dump(
+ DatasetEventCollection(dataset_events=events,
total_entries=len(events))
+ )
+
+
+def _get_upstream_dataset_events(*, dag_run: DagRun, session: Session =
NEW_SESSION) -> List["DagRun"]:
Review Comment:
```suggestion
def _get_upstream_dataset_events(*, dag_run: DagRun, session: Session) ->
List["DagRun"]:
```
##########
tests/api_connexion/endpoints/test_dag_run_endpoint.py:
##########
@@ -1483,3 +1488,180 @@ def test_should_respond_404(self):
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 404
+
+
+def test__get_upstream_dataset_events_no_prior(configured_app):
+ """If no prior dag runs, return all events"""
+ from airflow.api_connexion.endpoints.dag_run_endpoint import
_get_upstream_dataset_events
+
+ # setup dags and datasets
+ unique_id = str(uuid4())
+ session = settings.Session()
+ dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+ dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+ dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a,
dataset1b])
+ DAG.bulk_write_to_db(dags=[dag2], session=session)
+ session.add_all([dataset1a, dataset1b])
+ session.commit()
+
+ # add 5 events
+ session.add_all([DatasetEvent(dataset_id=dataset1a.id),
DatasetEvent(dataset_id=dataset1b.id)])
+ session.add_all([DatasetEvent(dataset_id=dataset1a.id),
DatasetEvent(dataset_id=dataset1b.id)])
+ session.add_all([DatasetEvent(dataset_id=dataset1a.id)])
+ session.commit()
+
+ # create a single dag run, no prior dag runs
+ dr = DagRun(dag2.dag_id, run_id=unique_id,
run_type=DagRunType.DATASET_TRIGGERED)
+ dr.dag = dag2
+ session.add(dr)
+ session.commit()
+ session.expunge_all()
+
+ # check result
+ events = _get_upstream_dataset_events(dag_run=dr, session=session)
+ assert len(events) == 5
+
+
+def test__get_upstream_dataset_events_with_prior(configured_app):
+ """
+ Events returned should be those that occurred after last DATASET_TRIGGERED
+ dag run and up to the exec date of current dag run.
+ """
+ from airflow.api_connexion.endpoints.dag_run_endpoint import
_get_upstream_dataset_events
+
+ # setup dags and datasets
+ unique_id = str(uuid4())
+ session = settings.Session()
+ dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
+ dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
+ dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a,
dataset1b])
+ DAG.bulk_write_to_db(dags=[dag2], session=session)
+ session.add_all([dataset1a, dataset1b])
+ session.commit()
+
+ # add 2 events, then a dag run, then 3 events, then another dag run then
another event
+ first_timestamp = pendulum.now('UTC')
+ session.add_all(
+ [
+ DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp),
+ DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp),
+ ]
+ )
+ dr = DagRun(
+ dag2.dag_id,
+ run_id=unique_id + '-1',
+ run_type=DagRunType.DATASET_TRIGGERED,
+ execution_date=first_timestamp.add(microseconds=1000),
+ )
+ dr.dag = dag2
+ session.add(dr)
+ session.add_all(
+ [
+ DatasetEvent(dataset_id=dataset1a.id,
created_at=first_timestamp.add(microseconds=2000)),
+ DatasetEvent(dataset_id=dataset1b.id,
created_at=first_timestamp.add(microseconds=3000)),
+ DatasetEvent(dataset_id=dataset1b.id,
created_at=first_timestamp.add(microseconds=4000)),
+ ]
+ )
+ dr = DagRun(
+ dag2.dag_id,
+ run_id=unique_id + '-2',
+ run_type=DagRunType.DATASET_TRIGGERED,
+ execution_date=first_timestamp.add(microseconds=4000), # exact same
time as 3rd event in window
+ )
+ dr.dag = dag2
+ session.add(dr)
+ dr = DagRun( # this dag run should be ignored
+ dag2.dag_id,
+ run_id=unique_id + '-3',
+ run_type=DagRunType.MANUAL,
+ execution_date=first_timestamp.add(microseconds=3000),
+ )
+ dr.dag = dag2
+ session.add(dr)
+ session.add_all(
+ [DatasetEvent(dataset_id=dataset1a.id,
created_at=first_timestamp.add(microseconds=5000))]
+ )
+ session.commit()
+ session.expunge_all()
+
+ # check result
+ events = _get_upstream_dataset_events(dag_run=dr, session=session)
+
+ event_times = [x.created_at for x in events]
+ assert event_times == [
+ first_timestamp.add(microseconds=2000),
+ first_timestamp.add(microseconds=3000),
+ first_timestamp.add(microseconds=4000),
+ ]
+
+
+class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
+
@mock.patch('airflow.api_connexion.endpoints.dag_run_endpoint._get_upstream_dataset_events')
+ def test_should_respond_200(self, mock_get_events, session):
+ dagrun_model = DagRun(
+ dag_id="TEST_DAG_ID",
+ run_id="TEST_DAG_RUN_ID",
+ run_type=DagRunType.DATASET_TRIGGERED,
+ execution_date=timezone.parse(self.default_time),
+ start_date=timezone.parse(self.default_time),
+ external_trigger=True,
+ state=DagRunState.RUNNING,
+ )
+ session.add(dagrun_model)
+ session.commit()
+ result = session.query(DagRun).all()
+ assert len(result) == 1
+ created_at = pendulum.now('UTC')
+ # make sure whatever is returned by this func is what comes out in
response.
+ mock_get_events.return_value = [DatasetEvent(dataset_id=1,
created_at=created_at)]
+ response = self.client.get(
+
"api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstream-dataset-events",
+ environ_overrides={'REMOTE_USER': "test"},
+ )
+ assert response.status_code == 200
+ expected_response = {
+ 'dataset_events': [
+ {
+ 'created_at': str(created_at),
+ 'dataset_id': 1,
+ 'extra': None,
+ 'id': None,
+ 'source_dag_id': None,
+ 'source_map_index': None,
+ 'source_run_id': None,
+ 'source_task_id': None,
+ }
+ ],
+ 'total_entries': 1,
+ }
+ assert response.json == expected_response
Review Comment:
Should we check the mock was called?
```suggestion
assert response.json == expected_response
mock_get_events.assert_called_once()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]