pierrejeambrun commented on code in PR #43934:
URL: https://github.com/apache/airflow/pull/43934#discussion_r1842260722
##########
airflow/api_fastapi/core_api/datamodels/dags.py:
##########
@@ -159,3 +159,18 @@ class DAGTagCollectionResponse(BaseModel):
tags: list[str]
total_entries: int
+
+
+class QueuedEventResponse(BaseModel):
+ """QueuedEvent serializer for responses.."""
Review Comment:
```suggestion
"""Queued Event serializer for responses.."""
```
##########
airflow/api_fastapi/core_api/datamodels/dags.py:
##########
@@ -159,3 +159,18 @@ class DAGTagCollectionResponse(BaseModel):
tags: list[str]
total_entries: int
+
+
+class QueuedEventResponse(BaseModel):
+ """QueuedEvent serializer for responses.."""
+
+ uri: str
+ dag_id: str
+ created_at: datetime
+
+
+class QueuedEventCollectionResponse(BaseModel):
+ """QueuedEventCollection serializer for responses."""
Review Comment:
```suggestion
"""Queued Event Collection serializer for responses."""
```
##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -303,3 +307,45 @@ def delete_dag(
status.HTTP_409_CONFLICT, f"Task instances of dag with id:
'{dag_id}' are still running"
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
+
+
+@dags_router.get(
+ "/{dag_id}/assets/queuedEvent",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
Review Comment:
Latest change of Kaxil's are now putting 401, and 403 to the base router, we
don't need to add them on a per `route` basis. (404 and others still need to be
added though).
https://github.com/apache/airflow/pull/43932
##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -303,3 +307,45 @@ def delete_dag(
status.HTTP_409_CONFLICT, f"Task instances of dag with id:
'{dag_id}' are still running"
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
+
+
+@dags_router.get(
+ "/{dag_id}/assets/queuedEvent",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def get_dag_asset_queued_events(
+ dag_id: str,
+ session: Annotated[Session, Depends(get_session)],
+ before: str = Query(None),
+) -> QueuedEventCollectionResponse:
+ """Get queued asset events for a DAG."""
+ where_clause = [AssetDagRunQueue.target_dag_id == dag_id]
+ if before:
Review Comment:
We need the common `_generate_queued_event_where_clause` code, or
equivalent, because 6 endpoints use that, instead of inlining the code in each
endpoints. This will makes things more re-usable and factorized.
##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -303,3 +307,45 @@ def delete_dag(
status.HTTP_409_CONFLICT, f"Task instances of dag with id:
'{dag_id}' are still running"
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
+
+
+@dags_router.get(
+ "/{dag_id}/assets/queuedEvent",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def get_dag_asset_queued_events(
+ dag_id: str,
+ session: Annotated[Session, Depends(get_session)],
+ before: str = Query(None),
+) -> QueuedEventCollectionResponse:
+ """Get queued asset events for a DAG."""
+ where_clause = [AssetDagRunQueue.target_dag_id == dag_id]
+ if before:
+ before_parsed = timezone.parse(before)
+ where_clause.append(AssetDagRunQueue.created_at < before_parsed)
+ query = (
+ select(AssetDagRunQueue, AssetModel.uri)
+ .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
+ .where(*where_clause)
+ )
+ result = session.execute(query).all()
+ total_entries = len(result)
+ if not result:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with
dag_id: `{dag_id}` was not found")
Review Comment:
I think you removed the `uri` from the error response message. I think it is
important to keep it for the client to understand which event was not found.
##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -303,3 +307,45 @@ def delete_dag(
status.HTTP_409_CONFLICT, f"Task instances of dag with id:
'{dag_id}' are still running"
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
+
+
+@dags_router.get(
+ "/{dag_id}/assets/queuedEvent",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def get_dag_asset_queued_events(
+ dag_id: str,
+ session: Annotated[Session, Depends(get_session)],
+ before: str = Query(None),
+) -> QueuedEventCollectionResponse:
+ """Get queued asset events for a DAG."""
+ where_clause = [AssetDagRunQueue.target_dag_id == dag_id]
+ if before:
+ before_parsed = timezone.parse(before)
+ where_clause.append(AssetDagRunQueue.created_at < before_parsed)
+ query = (
+ select(AssetDagRunQueue, AssetModel.uri)
+ .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
+ .where(*where_clause)
+ )
+ result = session.execute(query).all()
Review Comment:
You can also easily add the pagination, add we do for other endpoints,
really easy to do with our utility code. This will prevent you from having to
do manually:
```
result = session.execute(query).all()
total_entries = len(result)
```
##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -303,3 +307,45 @@ def delete_dag(
status.HTTP_409_CONFLICT, f"Task instances of dag with id:
'{dag_id}' are still running"
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
+
+
+@dags_router.get(
+ "/{dag_id}/assets/queuedEvent",
+ responses=create_openapi_http_exception_doc(
+ [
Review Comment:
[This](url) endpoint should be in `assets.py` I think. (same for tests)
##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -303,3 +307,45 @@ def delete_dag(
status.HTTP_409_CONFLICT, f"Task instances of dag with id:
'{dag_id}' are still running"
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
+
+
+@dags_router.get(
+ "/{dag_id}/assets/queuedEvent",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def get_dag_asset_queued_events(
+ dag_id: str,
+ session: Annotated[Session, Depends(get_session)],
+ before: str = Query(None),
Review Comment:
You can use the `DateTimeQuery` which is the common Safe DateTime parameter.
(cf other routes).
Also I think the type is wrong, `str | None` because of the default value.
##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -303,3 +307,45 @@ def delete_dag(
status.HTTP_409_CONFLICT, f"Task instances of dag with id:
'{dag_id}' are still running"
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
+
+
+@dags_router.get(
+ "/{dag_id}/assets/queuedEvent",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def get_dag_asset_queued_events(
+ dag_id: str,
+ session: Annotated[Session, Depends(get_session)],
+ before: str = Query(None),
+) -> QueuedEventCollectionResponse:
+ """Get queued asset events for a DAG."""
+ where_clause = [AssetDagRunQueue.target_dag_id == dag_id]
+ if before:
+ before_parsed = timezone.parse(before)
Review Comment:
This way you don't need to do that manually.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]