pierrejeambrun commented on code in PR #44052:
URL: https://github.com/apache/airflow/pull/44052#discussion_r1844187237
##########
airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -153,3 +175,25 @@ def get_asset(
raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri:
`{uri}` was not found")
return AssetResponse.model_validate(asset, from_attributes=True)
+
+
+@assets_router.delete(
+ "/queuedEvent/{uri:path}",
+ status_code=status.HTTP_204_NO_CONTENT,
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def delete_asset_queued_events(
+ uri: str,
+ session: Annotated[Session, Depends(get_session)],
+ before: str = Query(None),
Review Comment:
I think we can use directly `DateTimeQuery` or `DateTimeQuery | None`.
(param validation should be done by the route)
##########
tests/api_fastapi/core_api/routes/public/test_assets.py:
##########
@@ -459,3 +467,83 @@ def test_should_respond_404(self, test_client):
)
assert response.status_code == 404
assert response.json()["detail"] == "The Asset with uri:
`s3://bucket/key` was not found"
+
+
+class TestQueuedEventEndpoint(TestAssets):
+ @pytest.fixture
+ def time_freezer(self) -> Generator:
+ freezer = time_machine.travel(self.default_time, tick=False)
+ freezer.start()
+
+ yield
+
+ freezer.stop()
+
+ def _create_asset_dag_run_queues(self, dag_id, asset_id, session):
+ adrq = AssetDagRunQueue(target_dag_id=dag_id, asset_id=asset_id)
+ session.add(adrq)
+ session.commit()
+ return adrq
+
+
+class TestGetDagAssetQueuedEvents(TestQueuedEventEndpoint):
+ @pytest.mark.usefixtures("time_freezer")
+ def test_should_respond_200(self, test_client, session, create_dummy_dag):
+ dag, _ = create_dummy_dag()
+ dag_id = dag.dag_id
+ self.create_assets(session=session, num=1)
+ asset_id = 1
+ self._create_asset_dag_run_queues(dag_id, asset_id, session)
+
+ response = test_client.get(
+ f"/public/dags/{dag_id}/assets/queuedEvent",
+ )
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "queued_events": [
+ {
+ "created_at": self.default_time.replace("+00:00", "Z"),
+ "uri": "s3://bucket/key/1",
+ "dag_id": "dag",
+ }
+ ],
+ "total_entries": 1,
+ }
+
+ def test_should_respond_404(self, test_client):
+ dag_id = "not_exists"
+
+ response = test_client.get(
+ f"/public/dags/{dag_id}/assets/queuedEvent",
+ )
+
+ assert response.status_code == 404
+ assert response.json()["detail"] == "Queue event with dag_id:
`not_exists` was not found"
+
+
+class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint):
+ @pytest.mark.usefixtures("time_freezer")
+ def test_should_respond_204(self, test_client, session, create_dummy_dag):
+ dag, _ = create_dummy_dag()
+ dag_id = dag.dag_id
+ uri = "s3://bucket/key/1"
+ self.create_assets(session=session, num=1)
+ asset_id = 1
+ self._create_asset_dag_run_queues(dag_id, asset_id, session)
+
+ response = test_client.delete(
+ f"/public/assets/queuedEvent/{uri}",
+ )
+
Review Comment:
Nice we could also check in the db that the record has actually been deleted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]