pierrejeambrun commented on code in PR #43881:
URL: https://github.com/apache/airflow/pull/43881#discussion_r1840683048
##########
airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -69,3 +79,52 @@ def get_assets(
assets=[AssetResponse.model_validate(asset, from_attributes=True) for
asset in assets],
total_entries=total_entries,
)
+
+
+@assets_router.get(
+ "/events",
+ responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_asset_events(
+ limit: QueryLimit,
+ offset: QueryOffset,
+ order_by: Annotated[
+ SortParam,
+ Depends(
+ SortParam(
+ [
+ "source_task_id",
+ "source_dag_id",
+ "source_run_id",
+ "source_map_index",
+ "timestamp",
+ ],
+ AssetEvent,
+ ).dynamic_depends("timestamp")
+ ),
+ ],
+ asset_id: QueryAssetIdFilter,
+ source_dag_id: QuerySourceDagIdFilter,
+ source_task_id: QuerySourceTaskIdFilter,
+ source_run_id: QuerySourceRunIdFilter,
+ source_map_index: QuerySourceMapIndexFilter,
+ session: Annotated[Session, Depends(get_session)],
+) -> AssetEventCollectionResponse:
+ """Get asset events."""
+ assets_event_select, total_entries = paginated_select(
+ select(AssetEvent),
+ filters=[asset_id, source_dag_id, source_task_id, source_run_id,
source_map_index],
+ order_by=order_by,
+ offset=offset,
+ limit=limit,
+ session=session,
+ )
+
+ assets_events = session.scalars(assets_event_select).all()
Review Comment:
Original loading `options` for relationship have been removed. This is
important to keep as it will make the serialization faster and prevent multiple
db queries to be emitted lazily.
You need to add back the
`query.options(subqueryload(AssetEvent.created_dagruns))`
##########
airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -69,3 +79,52 @@ def get_assets(
assets=[AssetResponse.model_validate(asset, from_attributes=True) for
asset in assets],
total_entries=total_entries,
)
+
+
+@assets_router.get(
+ "/events",
+ responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+async def get_asset_events(
Review Comment:
The definition of routes have changed recently in
https://github.com/apache/airflow/pull/43797
Route should be decalred `sync`
```suggestion
def get_asset_events(
```
##########
airflow/api_fastapi/core_api/datamodels/assets.py:
##########
@@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel):
assets: list[AssetResponse]
total_entries: int
+
+
+class DagRunAssetReference(BaseModel):
+ """Serializable version of the DagRunAssetReference ORM SqlAlchemyModel."""
+
+ run_id: str
+ dag_id: str
+ execution_date: datetime = Field(alias="logical_date")
+ start_date: datetime
+ end_date: datetime
+ state: str
+ data_interval_start: datetime
+ data_interval_end: datetime
+
+
+class AssetEventResponse(BaseModel):
+ """Asset event serializer for responses."""
+
+ id: int
+ asset_id: int
+ asset_uri: str
+ extra: dict | None = None
+ source_task_id: str | None = None
+ source_dag_id: str | None = None
+ source_run_id: str | None = None
+ source_map_index: int
+ created_dagruns: list[DagRunAssetReference]
+ timestamp: datetime
+
+ @model_validator(mode="before")
+ def rename_uri_to_asset_uri(cls, values):
+ """Rename 'uri' to 'asset_uri' during serialization to match legacy
response."""
+ if hasattr(values, "uri") and values.uri:
+ values.asset_uri = values.uri
+ return values
Review Comment:
We can remove this, and simply have `uri` in the response. That's already a
breaking change going from `dataset_uri` to `asset_uri`, we might as well take
the opportunity to make it just `uri`
##########
airflow/api_fastapi/core_api/datamodels/assets.py:
##########
@@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel):
assets: list[AssetResponse]
total_entries: int
+
+
+class DagRunAssetReference(BaseModel):
+ """Serializable version of the DagRunAssetReference ORM SqlAlchemyModel."""
+
+ run_id: str
+ dag_id: str
+ execution_date: datetime = Field(alias="logical_date")
+ start_date: datetime
+ end_date: datetime
+ state: str
+ data_interval_start: datetime
+ data_interval_end: datetime
+
+
+class AssetEventResponse(BaseModel):
+ """Asset event serializer for responses."""
+
+ id: int
+ asset_id: int
+ asset_uri: str
+ extra: dict | None = None
+ source_task_id: str | None = None
+ source_dag_id: str | None = None
+ source_run_id: str | None = None
+ source_map_index: int
+ created_dagruns: list[DagRunAssetReference]
+ timestamp: datetime
+
+ @model_validator(mode="before")
+ def rename_uri_to_asset_uri(cls, values):
+ """Rename 'uri' to 'asset_uri' during serialization to match legacy
response."""
+ if hasattr(values, "uri") and values.uri:
+ values.asset_uri = values.uri
+ return values
+
+
+class AssetEventCollectionResponse(BaseModel):
+ """Asset collection response."""
Review Comment:
Wrong docstring
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]