Lee-W commented on code in PR #37176:
URL: https://github.com/apache/airflow/pull/37176#discussion_r1487180422


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -124,3 +130,153 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, 
total_entries=total_entries)
     )
+
+
+def _generate_queue_event_where_clause(
+    *,
+    dag_id: str | None = None,
+    dataset_id: int | None = None,
+    uri: str | None = None,
+    before: str | None = None,
+) -> list:
+    """Get DatasetDagRunQueue where clause."""
+    where_clause = []
+    if dag_id is not None:
+        where_clause.append(DatasetDagRunQueue.target_dag_id == dag_id)
+    if dataset_id is not None:
+        where_clause.append(DatasetDagRunQueue.dataset_id == dataset_id)
+    if uri is not None:
+        where_clause.append(DatasetModel.uri == uri)
+    if before is not None:
+        where_clause.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+    return where_clause
+
+
[email protected]_access_dataset("GET")
+@provide_session
+def get_dag_dataset_queue_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Get a queued Dataset event for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, uri=uri, 
before=before)
+    ddrq = session.scalar(
+        select(DatasetDagRunQueue)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    if ddrq is None:
+        raise NotFound(
+            "Queue event not found",
+            detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: 
`{uri}` was not found",
+        )
+    queue_event = {"created_at": ddrq.created_at, "dag_id": dag_id, "uri": uri}
+    return queue_event_schema.dump(queue_event)
+
+
[email protected]_access_dataset("DELETE")
+@provide_session
+def delete_dag_dataset_queue_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Delete a queued Dataset event for a DAG."""
+    dataset_id = 
session.scalars(select(DatasetModel.id).where(DatasetModel.uri == 
uri)).one_or_none()
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
dataset_id=dataset_id, before=before)
+    delete_stmt = delete(DatasetDagRunQueue).where(*where_clause)
+    result = session.execute(delete_stmt)
+    if result.rowcount > 0:
+        return NoContent, HTTPStatus.NO_CONTENT
+    raise NotFound(
+        "Queue event not found",
+        detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: `{uri}` 
was not found",
+    )
+
+
[email protected]_access_dataset("GET")
+@provide_session
+def get_dag_dataset_queue_events(
+    *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Get queued Dataset events for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
before=before)
+    query = (
+        select(DatasetDagRunQueue, DatasetModel.uri)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    result = session.execute(query).all()
+    total_entries = get_query_count(query, session=session)
+    if not result:
+        raise NotFound(
+            "Queue event not found",
+            detail=f"Queue event with dag_id: `{dag_id}` was not found",
+        )
+    queue_events = [
+        QueueEvent(created_at=ddrq.created_at, dag_id=ddrq.target_dag_id, 
uri=uri) for ddrq, uri in result
+    ]
+    return queue_event_collection_schema.dump(
+        QueueEventCollection(queue_events=queue_events, 
total_entries=total_entries)
+    )
+
+
[email protected]_access_dataset("DELETE")
+@provide_session
+def delete_dag_dataset_queue_events(
+    *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Delete queued Dataset events for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
before=before)
+    delete_stmt = delete(DatasetDagRunQueue).where(*where_clause)
+    result = session.execute(delete_stmt)
+    if result.rowcount > 0:
+        return NoContent, HTTPStatus.NO_CONTENT
+
+    raise NotFound(
+        "Queue event not found",
+        detail=f"Queue event with dag_id: `{dag_id}` was not found",
+    )
+
+
[email protected]_access_dataset("GET")

Review Comment:
   Sure. Just added it!



##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -124,3 +130,153 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, 
total_entries=total_entries)
     )
+
+
+def _generate_queue_event_where_clause(
+    *,
+    dag_id: str | None = None,
+    dataset_id: int | None = None,
+    uri: str | None = None,
+    before: str | None = None,
+) -> list:
+    """Get DatasetDagRunQueue where clause."""
+    where_clause = []
+    if dag_id is not None:
+        where_clause.append(DatasetDagRunQueue.target_dag_id == dag_id)
+    if dataset_id is not None:
+        where_clause.append(DatasetDagRunQueue.dataset_id == dataset_id)
+    if uri is not None:
+        where_clause.append(DatasetModel.uri == uri)
+    if before is not None:
+        where_clause.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+    return where_clause
+
+
[email protected]_access_dataset("GET")
+@provide_session
+def get_dag_dataset_queue_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Get a queued Dataset event for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, uri=uri, 
before=before)
+    ddrq = session.scalar(
+        select(DatasetDagRunQueue)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    if ddrq is None:
+        raise NotFound(
+            "Queue event not found",
+            detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: 
`{uri}` was not found",
+        )
+    queue_event = {"created_at": ddrq.created_at, "dag_id": dag_id, "uri": uri}
+    return queue_event_schema.dump(queue_event)
+
+
[email protected]_access_dataset("DELETE")
+@provide_session
+def delete_dag_dataset_queue_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Delete a queued Dataset event for a DAG."""
+    dataset_id = 
session.scalars(select(DatasetModel.id).where(DatasetModel.uri == 
uri)).one_or_none()
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
dataset_id=dataset_id, before=before)
+    delete_stmt = delete(DatasetDagRunQueue).where(*where_clause)
+    result = session.execute(delete_stmt)
+    if result.rowcount > 0:
+        return NoContent, HTTPStatus.NO_CONTENT
+    raise NotFound(
+        "Queue event not found",
+        detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: `{uri}` 
was not found",
+    )
+
+
[email protected]_access_dataset("GET")
+@provide_session
+def get_dag_dataset_queue_events(
+    *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Get queued Dataset events for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
before=before)
+    query = (
+        select(DatasetDagRunQueue, DatasetModel.uri)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    result = session.execute(query).all()
+    total_entries = get_query_count(query, session=session)
+    if not result:
+        raise NotFound(
+            "Queue event not found",
+            detail=f"Queue event with dag_id: `{dag_id}` was not found",
+        )
+    queue_events = [
+        QueueEvent(created_at=ddrq.created_at, dag_id=ddrq.target_dag_id, 
uri=uri) for ddrq, uri in result
+    ]
+    return queue_event_collection_schema.dump(
+        QueueEventCollection(queue_events=queue_events, 
total_entries=total_entries)
+    )
+
+
[email protected]_access_dataset("DELETE")
+@provide_session
+def delete_dag_dataset_queue_events(
+    *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Delete queued Dataset events for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
before=before)
+    delete_stmt = delete(DatasetDagRunQueue).where(*where_clause)
+    result = session.execute(delete_stmt)
+    if result.rowcount > 0:
+        return NoContent, HTTPStatus.NO_CONTENT
+
+    raise NotFound(
+        "Queue event not found",
+        detail=f"Queue event with dag_id: `{dag_id}` was not found",
+    )
+
+
[email protected]_access_dataset("GET")
+@provide_session
+def get_dataset_queue_events(
+    *, uri: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Get queued Dataset events for a Dataset."""
+    where_clause = _generate_queue_event_where_clause(uri=uri, before=before)
+    query = (
+        select(DatasetDagRunQueue, DatasetModel.uri)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    total_entries = get_query_count(query, session=session)
+    result = session.execute(query).all()
+    if total_entries > 0:
+        queue_events = [
+            QueueEvent(created_at=ddrq.created_at, dag_id=ddrq.target_dag_id, 
uri=uri) for ddrq, uri in result
+        ]
+        return queue_event_collection_schema.dump(
+            QueueEventCollection(queue_events=queue_events, 
total_entries=total_entries)
+        )
+    raise NotFound(
+        "Queue event not found",
+        detail=f"Queue event with dataset uri: `{uri}` was not found",
+    )
+
+
[email protected]_access_dataset("DELETE")

Review Comment:
   Sure. Just added it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to