dstandish commented on code in PR #25039:
URL: https://github.com/apache/airflow/pull/25039#discussion_r921698877


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -59,3 +63,42 @@ def get_datasets(
     query = apply_sorting(query, order_by, {}, allowed_filter_attrs)
     datasets = query.offset(offset).limit(limit).all()
     return dataset_collection_schema.dump(DatasetCollection(datasets=datasets, 
total_entries=total_entries))
+
+
[email protected]_access([(permissions.ACTION_CAN_READ, 
permissions.RESOURCE_DATASET)])
+@provide_session
+@format_parameters({'limit': check_limit})
+def get_dataset_events(

Review Comment:
   wondering if we should maybe also allow filtering on URI here. what do you 
think?  seems like we probably ought to



##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3461,6 +3491,57 @@ components:
                 $ref: '#/components/schemas/Dataset'
         - $ref: '#/components/schemas/CollectionInfo'
 
+    DatasetEvent:

Review Comment:
   any reason not to include the key i.e. `id`?



##########
tests/api_connexion/endpoints/test_dataset_endpoint.py:
##########
@@ -239,23 +230,240 @@ def test_should_respect_page_size_limit_default(self, 
session):
         ]
         session.add_all(datasets)
         session.commit()
+
         response = self.client.get("/api/v1/datasets", 
environ_overrides={'REMOTE_USER': "test"})
+
         assert response.status_code == 200
         assert len(response.json['datasets']) == 100
 
     @conf_vars({("api", "maximum_page_limit"): "150"})
     def test_should_return_conf_max_if_req_max_above_conf(self, session):
         datasets = [
             Dataset(
-                uri=f"s3://bucket/key/{i+1}",
+                uri=f"s3://bucket/key/{i}",
                 extra={"foo": "bar"},
                 created_at=timezone.parse(self.default_time),
                 updated_at=timezone.parse(self.default_time),
             )
-            for i in range(200)
+            for i in range(1, 200)
         ]
         session.add_all(datasets)
         session.commit()
+
         response = self.client.get("/api/v1/datasets?limit=180", 
environ_overrides={'REMOTE_USER': "test"})
+
         assert response.status_code == 200
         assert len(response.json['datasets']) == 150
+
+
+class TestGetDatasetEvents(TestDatasetEndpoint):
+    def test_should_respond_200(self, session):
+        self._create_dataset(session)
+        common = {
+            "dataset_id": 1,
+            "extra": "{'foo': 'bar'}",
+            "source_dag_id": "foo",
+            "source_task_id": "bar",
+            "source_run_id": "custom",
+            "source_map_index": -1,
+        }
+
+        events = [DatasetEvent(created_at=timezone.parse(self.default_time), 
**common) for i in [1, 2]]
+        session.add_all(events)
+        session.commit()
+        assert session.query(DatasetEvent).count() == 2
+
+        response = self.client.get("/api/v1/datasets/events", 
environ_overrides={'REMOTE_USER': "test"})
+
+        assert response.status_code == 200
+        response_data = response.json
+        assert response_data == {
+            "dataset_events": [
+                {"created_at": self.default_time, **common},
+                {"created_at": self.default_time, **common},
+            ],
+            "total_entries": 2,
+        }
+
+    @parameterized.expand(
+        [
+            ('dataset_id', '2'),
+            ('source_dag_id', 'dag2'),
+            ('source_task_id', 'task2'),
+            ('source_run_id', 'run2'),
+            ('source_map_index', '2'),
+        ]
+    )
+    @provide_session
+    def test_filtering(self, attr, value, session):
+        datasets = [
+            Dataset(
+                id=i,
+                uri=f"s3://bucket/key/{i}",
+                extra={"foo": "bar"},
+                created_at=timezone.parse(self.default_time),
+                updated_at=timezone.parse(self.default_time),
+            )
+            for i in [1, 2, 3]
+        ]
+        session.add_all(datasets)
+        session.commit()
+        events = [
+            DatasetEvent(
+                dataset_id=i,
+                source_dag_id=f"dag{i}",
+                source_task_id=f"task{i}",
+                source_run_id=f"run{i}",
+                source_map_index=i,
+                created_at=timezone.parse(self.default_time),
+            )
+            for i in [1, 2, 3]
+        ]
+        session.add_all(events)
+        session.commit()
+        assert session.query(DatasetEvent).count() == 3
+
+        response = self.client.get(
+            f"/api/v1/datasets/events?{attr}={value}", 
environ_overrides={'REMOTE_USER': "test"}
+        )
+
+        assert response.status_code == 200
+        response_data = response.json
+        assert response_data == {
+            "dataset_events": [
+                {
+                    "dataset_id": 2,
+                    "extra": None,
+                    "source_dag_id": "dag2",
+                    "source_task_id": "task2",
+                    "source_run_id": "run2",
+                    "source_map_index": 2,
+                    "created_at": self.default_time,
+                }
+            ],
+            "total_entries": 1,
+        }
+
+    def test_order_by_raises_400_for_invalid_attr(self, session):
+        self._create_dataset(session)
+        events = [
+            DatasetEvent(
+                dataset_id=1,
+                extra="{'foo': 'bar'}",
+                source_dag_id="foo",
+                source_task_id="bar",
+                source_run_id="custom",
+                source_map_index=-1,
+                created_at=timezone.parse(self.default_time),
+            )
+            for i in [1, 2]
+        ]
+        session.add_all(events)
+        session.commit()
+        assert session.query(DatasetEvent).count() == 2
+
+        response = self.client.get(
+            "/api/v1/datasets/events?order_by=fake", 
environ_overrides={'REMOTE_USER': "test"}
+        )  # missing attr
+
+        assert response.status_code == 400
+        msg = "Ordering with 'fake' is disallowed or the attribute does not 
exist on the model"
+        assert response.json['detail'] == msg
+
+    def test_should_raises_401_unauthenticated(self, session):
+        response = self.client.get("/api/v1/datasets/events")
+        assert_401(response)
+
+
+class TestGetDatasetEvenetsEndpointPagination(TestDatasetEndpoint):

Review Comment:
   ```suggestion
   class TestGetDatasetEventsEndpointPagination(TestDatasetEndpoint):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to