pierrejeambrun commented on code in PR #44054:
URL: https://github.com/apache/airflow/pull/44054#discussion_r1844193647
##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -303,3 +306,76 @@ def delete_dag(
status.HTTP_409_CONFLICT, f"Task instances of dag with id:
'{dag_id}' are still running"
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
+
+
+@dags_router.get(
+ "/{dag_id}/assets/queuedEvent",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def get_dag_asset_queued_events(
+ dag_id: str,
+ session: Annotated[Session, Depends(get_session)],
+ before: str = Query(None),
+) -> QueuedEventCollectionResponse:
+ """Get queued asset events for a DAG."""
+ where_clause = _generate_queued_event_where_clause(dag_id=dag_id,
before=before)
+ query = (
+ select(AssetDagRunQueue, AssetModel.uri)
+ .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
+ .where(*where_clause)
+ )
+
+ dag_asset_queued_events_select, total_entries = paginated_select(
+ query,
+ [],
+ )
+ adrqs = session.execute(dag_asset_queued_events_select).all()
+
+ if not adrqs:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with
dag_id: `{dag_id}` was not found")
+
+ queued_events = [
+ QueuedEventResponse(created_at=adrq.created_at,
dag_id=adrq.target_dag_id, uri=uri)
+ for adrq, uri in adrqs
+ ]
+
+ return QueuedEventCollectionResponse(
+ queued_events=[
+ QueuedEventResponse.model_validate(queued_event,
from_attributes=True)
+ for queued_event in queued_events
+ ],
+ total_entries=total_entries,
+ )
+
+
+@dags_router.delete(
+ "/{dag_id}/assets/queuedEvent/{uri:path}",
+ status_code=status.HTTP_204_NO_CONTENT,
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def delete_dag_asset_queued_event(
+ dag_id: str,
+ uri: str,
+ session: Annotated[Session, Depends(get_session)],
+ before: str = Query(None),
Review Comment:
Use the `DateTimeQuery`, validation and parsing of parameters should be done
by the route. So downstream functions manipulated a validated input, and not a
bare string.
##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -303,3 +306,76 @@ def delete_dag(
status.HTTP_409_CONFLICT, f"Task instances of dag with id:
'{dag_id}' are still running"
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
+
+
+@dags_router.get(
+ "/{dag_id}/assets/queuedEvent",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def get_dag_asset_queued_events(
+ dag_id: str,
+ session: Annotated[Session, Depends(get_session)],
+ before: str = Query(None),
+) -> QueuedEventCollectionResponse:
+ """Get queued asset events for a DAG."""
+ where_clause = _generate_queued_event_where_clause(dag_id=dag_id,
before=before)
+ query = (
+ select(AssetDagRunQueue, AssetModel.uri)
+ .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
+ .where(*where_clause)
+ )
+
+ dag_asset_queued_events_select, total_entries = paginated_select(
+ query,
+ [],
+ )
+ adrqs = session.execute(dag_asset_queued_events_select).all()
+
+ if not adrqs:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with
dag_id: `{dag_id}` was not found")
+
+ queued_events = [
+ QueuedEventResponse(created_at=adrq.created_at,
dag_id=adrq.target_dag_id, uri=uri)
+ for adrq, uri in adrqs
+ ]
+
+ return QueuedEventCollectionResponse(
+ queued_events=[
+ QueuedEventResponse.model_validate(queued_event,
from_attributes=True)
+ for queued_event in queued_events
+ ],
+ total_entries=total_entries,
+ )
+
+
+@dags_router.delete(
+ "/{dag_id}/assets/queuedEvent/{uri:path}",
+ status_code=status.HTTP_204_NO_CONTENT,
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def delete_dag_asset_queued_event(
Review Comment:
Should go in asset endpoint.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]