amoghrajesh commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1839620372
##########
airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -69,3 +79,46 @@ def get_assets(
assets=[AssetResponse.model_validate(asset, from_attributes=True) for
asset in assets],
total_entries=total_entries,
)
+
+
+@assets_router.get(
+ "/events",
+ responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_asset_events(
+ limit: QueryLimit,
+ offset: QueryOffset,
+ order_by: Annotated[
+ SortParam,
+ Depends(
+ SortParam(
+ ["timestamp", "source_dag_id", "source_task_id",
"source_run_id", "source_map_index"],
+ AssetEvent,
+ ).dynamic_depends("timestamp")
Review Comment:
I think we should remove "timestamp" here as there is no default set in the
docs:
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dataset_events
##########
tests/api_fastapi/core_api/routes/public/test_assets.py:
##########
@@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self,
test_client):
assert response.status_code == 200
assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetsEvents(TestAssets):
Review Comment:
```suggestion
class TestGetAssetEvents(TestAssets):
```
##########
airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -69,3 +79,46 @@ def get_assets(
assets=[AssetResponse.model_validate(asset, from_attributes=True) for
asset in assets],
total_entries=total_entries,
)
+
+
+@assets_router.get(
+ "/events",
+ responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_asset_events(
+ limit: QueryLimit,
+ offset: QueryOffset,
+ order_by: Annotated[
+ SortParam,
+ Depends(
+ SortParam(
+ ["timestamp", "source_dag_id", "source_task_id",
"source_run_id", "source_map_index"],
+ AssetEvent,
+ ).dynamic_depends("timestamp")
+ ),
+ ],
+ asset_id: QueryAssetIdFilter,
+ source_dag_id: QuerySourceDagIdFilter,
+ source_task_id: QuerySourceTaskIdFilter,
+ source_run_id: QuerySourceRunIdFilter,
+ source_map_index: QuerySourceMapIndexFilter,
+ session: Annotated[Session, Depends(get_session)],
+) -> AssetEventCollectionResponse:
+ """Get assets events."""
Review Comment:
```suggestion
"""Get asset events."""
```
##########
airflow/api_fastapi/core_api/datamodels/assets.py:
##########
@@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel):
assets: list[AssetResponse]
total_entries: int
+
+
+class DagRunAssetReference(BaseModel):
+ """Serializable version of the DagRunAssetReference ORM SqlAlchemyModel."""
+
+ run_id: str
+ dag_id: str
+ execution_date: datetime = Field(alias="logical_date")
+ start_date: datetime
+ end_date: datetime
+ state: str
+ data_interval_start: datetime
+ data_interval_end: datetime
+
+
+class AssetEventResponse(BaseModel):
+ """Asset event serializer for responses."""
+
+ id: int
+ asset_id: int
+ asset_uri: str
+ extra: dict | None = None
+ source_task_id: str | None = None
+ source_dag_id: str | None = None
+ source_run_id: str | None = None
+ source_map_index: int
+ created_dagruns: list[DagRunAssetReference]
+ timestamp: datetime
+
+ @model_validator(mode="before")
+ def rename_uri_to_asset_uri(cls, values):
+ """Rename 'uri' to 'asset_uri' during serialization."""
Review Comment:
```suggestion
"""Rename 'uri' to 'asset_uri' during serialization to match legacy
response."""
```
##########
airflow/api_fastapi/core_api/routes/public/__init__.py:
##########
@@ -58,5 +58,6 @@
public_router.include_router(variables_router)
public_router.include_router(version_router)
public_router.include_router(dag_stats_router)
+public_router.include_router(assets_router)
Review Comment:
This one is added on line 63 also. Remove it from here
##########
tests/api_fastapi/core_api/routes/public/test_assets.py:
##########
@@ -229,3 +303,94 @@ def test_should_respect_page_size_limit_default(self,
test_client):
assert response.status_code == 200
assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetsEvents(TestAssets):
Review Comment:
Comparing with test_asset_endpoint.py from legacy API, can we try to
maintain similar functions?
I think these are missing/renamed:
1. `test_filtering` -> has general filtering compared to
`test_filter_events_by_asset_id`
2. `test_includes_created_dagrun`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]