bbovenzi commented on code in PR #50060:
URL: https://github.com/apache/airflow/pull/50060#discussion_r2070575822
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -135,20 +135,55 @@ def get_assets(
session: SessionDep,
) -> AssetCollectionResponse:
"""Get assets."""
+ # First, we're pulling the Asset ID, AssetEvent ID, and AssetEvent
timestamp for the latest (last)
+ # AssetEvent. We'll eventually OUTER JOIN this to the AssetModel
+ asset_event_query = (
+ select(
+ AssetEvent.asset_id, # The ID of the Asset, which we'll need to
JOIN to the AssetModel
+ func.max(AssetEvent.id).label("last_asset_event_id"), # The ID of
the last AssetEvent
+ func.max(AssetEvent.timestamp).label("last_asset_event_timestamp"),
+ )
+ .group_by(AssetEvent.asset_id)
+ .subquery()
+ )
+
+ assets_select_statement = select(
+ AssetModel,
+ asset_event_query.c.last_asset_event_id, # This should be the
AssetEvent.id
+ asset_event_query.c.last_asset_event_timestamp,
+ ).outerjoin(asset_event_query, AssetModel.id ==
asset_event_query.c.asset_id)
+
assets_select, total_entries = paginated_select(
- statement=select(AssetModel),
+ statement=assets_select_statement,
filters=[only_active, name_pattern, uri_pattern, dag_ids],
order_by=order_by,
Review Comment:
We'll definitely want to update the filters and order_by shortly after this
is merged.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -135,20 +135,55 @@ def get_assets(
session: SessionDep,
) -> AssetCollectionResponse:
"""Get assets."""
+ # First, we're pulling the Asset ID, AssetEvent ID, and AssetEvent
timestamp for the latest (last)
+ # AssetEvent. We'll eventually OUTER JOIN this to the AssetModel
+ asset_event_query = (
+ select(
+ AssetEvent.asset_id, # The ID of the Asset, which we'll need to
JOIN to the AssetModel
+ func.max(AssetEvent.id).label("last_asset_event_id"), # The ID of
the last AssetEvent
+ func.max(AssetEvent.timestamp).label("last_asset_event_timestamp"),
+ )
+ .group_by(AssetEvent.asset_id)
+ .subquery()
Review Comment:
Let's make sure we're actually selected the latest timestamp here
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -135,20 +135,55 @@ def get_assets(
session: SessionDep,
) -> AssetCollectionResponse:
"""Get assets."""
+ # First, we're pulling the Asset ID, AssetEvent ID, and AssetEvent
timestamp for the latest (last)
+ # AssetEvent. We'll eventually OUTER JOIN this to the AssetModel
+ asset_event_query = (
+ select(
+ AssetEvent.asset_id, # The ID of the Asset, which we'll need to
JOIN to the AssetModel
+ func.max(AssetEvent.id).label("last_asset_event_id"), # The ID of
the last AssetEvent
+ func.max(AssetEvent.timestamp).label("last_asset_event_timestamp"),
+ )
+ .group_by(AssetEvent.asset_id)
+ .subquery()
Review Comment:
Let's make sure we're actually selecting the latest timestamp here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]