jedcunningham commented on code in PR #37176:
URL: https://github.com/apache/airflow/pull/37176#discussion_r1486982924


##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -124,3 +130,153 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, 
total_entries=total_entries)
     )
+
+
+def _generate_queue_event_where_clause(
+    *,
+    dag_id: str | None = None,
+    dataset_id: int | None = None,
+    uri: str | None = None,
+    before: str | None = None,
+) -> list:
+    """Get DatasetDagRunQueue where clause."""
+    where_clause = []
+    if dag_id is not None:
+        where_clause.append(DatasetDagRunQueue.target_dag_id == dag_id)
+    if dataset_id is not None:
+        where_clause.append(DatasetDagRunQueue.dataset_id == dataset_id)
+    if uri is not None:
+        where_clause.append(DatasetModel.uri == uri)
+    if before is not None:
+        where_clause.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+    return where_clause
+
+
[email protected]_access_dataset("GET")
+@provide_session
+def get_dag_dataset_queue_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Get a queued Dataset event for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, uri=uri, 
before=before)
+    ddrq = session.scalar(
+        select(DatasetDagRunQueue)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    if ddrq is None:
+        raise NotFound(
+            "Queue event not found",
+            detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: 
`{uri}` was not found",
+        )
+    queue_event = {"created_at": ddrq.created_at, "dag_id": dag_id, "uri": uri}
+    return queue_event_schema.dump(queue_event)
+
+
[email protected]_access_dataset("DELETE")
+@provide_session
+def delete_dag_dataset_queue_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Delete a queued Dataset event for a DAG."""
+    dataset_id = 
session.scalars(select(DatasetModel.id).where(DatasetModel.uri == 
uri)).one_or_none()
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
dataset_id=dataset_id, before=before)
+    delete_stmt = delete(DatasetDagRunQueue).where(*where_clause)
+    result = session.execute(delete_stmt)
+    if result.rowcount > 0:
+        return NoContent, HTTPStatus.NO_CONTENT
+    raise NotFound(
+        "Queue event not found",
+        detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: `{uri}` 
was not found",
+    )
+
+
[email protected]_access_dataset("GET")
+@provide_session
+def get_dag_dataset_queue_events(
+    *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Get queued Dataset events for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
before=before)
+    query = (
+        select(DatasetDagRunQueue, DatasetModel.uri)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    result = session.execute(query).all()
+    total_entries = get_query_count(query, session=session)
+    if not result:
+        raise NotFound(
+            "Queue event not found",
+            detail=f"Queue event with dag_id: `{dag_id}` was not found",
+        )
+    queue_events = [
+        QueueEvent(created_at=ddrq.created_at, dag_id=ddrq.target_dag_id, 
uri=uri) for ddrq, uri in result
+    ]
+    return queue_event_collection_schema.dump(
+        QueueEventCollection(queue_events=queue_events, 
total_entries=total_entries)
+    )
+
+
[email protected]_access_dataset("DELETE")
+@provide_session
+def delete_dag_dataset_queue_events(
+    *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Delete queued Dataset events for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
before=before)
+    delete_stmt = delete(DatasetDagRunQueue).where(*where_clause)
+    result = session.execute(delete_stmt)
+    if result.rowcount > 0:
+        return NoContent, HTTPStatus.NO_CONTENT
+
+    raise NotFound(
+        "Queue event not found",
+        detail=f"Queue event with dag_id: `{dag_id}` was not found",
+    )
+
+
[email protected]_access_dataset("GET")

Review Comment:
   We probably need to filter the records we return to only DAGs that the user 
has read on.



##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -124,3 +130,153 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, 
total_entries=total_entries)
     )
+
+
+def _generate_queue_event_where_clause(
+    *,
+    dag_id: str | None = None,
+    dataset_id: int | None = None,
+    uri: str | None = None,
+    before: str | None = None,
+) -> list:
+    """Get DatasetDagRunQueue where clause."""
+    where_clause = []
+    if dag_id is not None:
+        where_clause.append(DatasetDagRunQueue.target_dag_id == dag_id)
+    if dataset_id is not None:
+        where_clause.append(DatasetDagRunQueue.dataset_id == dataset_id)
+    if uri is not None:
+        where_clause.append(DatasetModel.uri == uri)
+    if before is not None:
+        where_clause.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+    return where_clause
+
+
[email protected]_access_dataset("GET")
+@provide_session
+def get_dag_dataset_queue_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Get a queued Dataset event for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, uri=uri, 
before=before)
+    ddrq = session.scalar(
+        select(DatasetDagRunQueue)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    if ddrq is None:
+        raise NotFound(
+            "Queue event not found",
+            detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: 
`{uri}` was not found",
+        )
+    queue_event = {"created_at": ddrq.created_at, "dag_id": dag_id, "uri": uri}
+    return queue_event_schema.dump(queue_event)
+
+
[email protected]_access_dataset("DELETE")

Review Comment:
   ```suggestion
   @security.requires_access_dataset("DELETE")
   @security.requires_access_dag("GET")
   ```
   
   Though read is probably enough to check when talking delete?



##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -124,3 +130,153 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, 
total_entries=total_entries)
     )
+
+
+def _generate_queue_event_where_clause(
+    *,
+    dag_id: str | None = None,
+    dataset_id: int | None = None,
+    uri: str | None = None,
+    before: str | None = None,
+) -> list:
+    """Get DatasetDagRunQueue where clause."""
+    where_clause = []
+    if dag_id is not None:
+        where_clause.append(DatasetDagRunQueue.target_dag_id == dag_id)
+    if dataset_id is not None:
+        where_clause.append(DatasetDagRunQueue.dataset_id == dataset_id)
+    if uri is not None:
+        where_clause.append(DatasetModel.uri == uri)
+    if before is not None:
+        where_clause.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+    return where_clause
+
+
[email protected]_access_dataset("GET")
+@provide_session
+def get_dag_dataset_queue_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Get a queued Dataset event for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, uri=uri, 
before=before)
+    ddrq = session.scalar(
+        select(DatasetDagRunQueue)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    if ddrq is None:
+        raise NotFound(
+            "Queue event not found",
+            detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: 
`{uri}` was not found",
+        )
+    queue_event = {"created_at": ddrq.created_at, "dag_id": dag_id, "uri": uri}
+    return queue_event_schema.dump(queue_event)
+
+
[email protected]_access_dataset("DELETE")
+@provide_session
+def delete_dag_dataset_queue_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Delete a queued Dataset event for a DAG."""
+    dataset_id = 
session.scalars(select(DatasetModel.id).where(DatasetModel.uri == 
uri)).one_or_none()
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
dataset_id=dataset_id, before=before)
+    delete_stmt = delete(DatasetDagRunQueue).where(*where_clause)
+    result = session.execute(delete_stmt)
+    if result.rowcount > 0:
+        return NoContent, HTTPStatus.NO_CONTENT
+    raise NotFound(
+        "Queue event not found",
+        detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: `{uri}` 
was not found",
+    )
+
+
[email protected]_access_dataset("GET")

Review Comment:
   ```suggestion
   @security.requires_access_dataset("GET")
   @security.requires_access_dag("GET")
   ```



##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -124,3 +130,153 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, 
total_entries=total_entries)
     )
+
+
+def _generate_queue_event_where_clause(
+    *,
+    dag_id: str | None = None,
+    dataset_id: int | None = None,
+    uri: str | None = None,
+    before: str | None = None,
+) -> list:
+    """Get DatasetDagRunQueue where clause."""
+    where_clause = []
+    if dag_id is not None:
+        where_clause.append(DatasetDagRunQueue.target_dag_id == dag_id)
+    if dataset_id is not None:
+        where_clause.append(DatasetDagRunQueue.dataset_id == dataset_id)
+    if uri is not None:
+        where_clause.append(DatasetModel.uri == uri)
+    if before is not None:
+        where_clause.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+    return where_clause
+
+
[email protected]_access_dataset("GET")
+@provide_session
+def get_dag_dataset_queue_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Get a queued Dataset event for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, uri=uri, 
before=before)
+    ddrq = session.scalar(
+        select(DatasetDagRunQueue)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    if ddrq is None:
+        raise NotFound(
+            "Queue event not found",
+            detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: 
`{uri}` was not found",
+        )
+    queue_event = {"created_at": ddrq.created_at, "dag_id": dag_id, "uri": uri}
+    return queue_event_schema.dump(queue_event)
+
+
[email protected]_access_dataset("DELETE")
+@provide_session
+def delete_dag_dataset_queue_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Delete a queued Dataset event for a DAG."""
+    dataset_id = 
session.scalars(select(DatasetModel.id).where(DatasetModel.uri == 
uri)).one_or_none()
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
dataset_id=dataset_id, before=before)
+    delete_stmt = delete(DatasetDagRunQueue).where(*where_clause)
+    result = session.execute(delete_stmt)
+    if result.rowcount > 0:
+        return NoContent, HTTPStatus.NO_CONTENT
+    raise NotFound(
+        "Queue event not found",
+        detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: `{uri}` 
was not found",
+    )
+
+
[email protected]_access_dataset("GET")
+@provide_session
+def get_dag_dataset_queue_events(
+    *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Get queued Dataset events for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
before=before)
+    query = (
+        select(DatasetDagRunQueue, DatasetModel.uri)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    result = session.execute(query).all()
+    total_entries = get_query_count(query, session=session)
+    if not result:
+        raise NotFound(
+            "Queue event not found",
+            detail=f"Queue event with dag_id: `{dag_id}` was not found",
+        )
+    queue_events = [
+        QueueEvent(created_at=ddrq.created_at, dag_id=ddrq.target_dag_id, 
uri=uri) for ddrq, uri in result
+    ]
+    return queue_event_collection_schema.dump(
+        QueueEventCollection(queue_events=queue_events, 
total_entries=total_entries)
+    )
+
+
[email protected]_access_dataset("DELETE")

Review Comment:
   ```suggestion
   @security.requires_access_dataset("DELETE")
   @security.requires_access_dag("GET")
   ```



##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -124,3 +130,153 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, 
total_entries=total_entries)
     )
+
+
+def _generate_queue_event_where_clause(
+    *,
+    dag_id: str | None = None,
+    dataset_id: int | None = None,
+    uri: str | None = None,
+    before: str | None = None,
+) -> list:
+    """Get DatasetDagRunQueue where clause."""
+    where_clause = []
+    if dag_id is not None:
+        where_clause.append(DatasetDagRunQueue.target_dag_id == dag_id)
+    if dataset_id is not None:
+        where_clause.append(DatasetDagRunQueue.dataset_id == dataset_id)
+    if uri is not None:
+        where_clause.append(DatasetModel.uri == uri)
+    if before is not None:
+        where_clause.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+    return where_clause
+
+
[email protected]_access_dataset("GET")

Review Comment:
   ```suggestion
   @security.requires_access_dataset("GET")
   @security.requires_access_dag("GET")
   ```
   
   We should probably also check the user has read on the DAG.



##########
airflow/api_connexion/endpoints/dataset_endpoint.py:
##########
@@ -124,3 +130,153 @@ def get_dataset_events(
     return dataset_event_collection_schema.dump(
         DatasetEventCollection(dataset_events=events, 
total_entries=total_entries)
     )
+
+
+def _generate_queue_event_where_clause(
+    *,
+    dag_id: str | None = None,
+    dataset_id: int | None = None,
+    uri: str | None = None,
+    before: str | None = None,
+) -> list:
+    """Get DatasetDagRunQueue where clause."""
+    where_clause = []
+    if dag_id is not None:
+        where_clause.append(DatasetDagRunQueue.target_dag_id == dag_id)
+    if dataset_id is not None:
+        where_clause.append(DatasetDagRunQueue.dataset_id == dataset_id)
+    if uri is not None:
+        where_clause.append(DatasetModel.uri == uri)
+    if before is not None:
+        where_clause.append(DatasetDagRunQueue.created_at < 
format_datetime(before))
+    return where_clause
+
+
[email protected]_access_dataset("GET")
+@provide_session
+def get_dag_dataset_queue_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Get a queued Dataset event for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, uri=uri, 
before=before)
+    ddrq = session.scalar(
+        select(DatasetDagRunQueue)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    if ddrq is None:
+        raise NotFound(
+            "Queue event not found",
+            detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: 
`{uri}` was not found",
+        )
+    queue_event = {"created_at": ddrq.created_at, "dag_id": dag_id, "uri": uri}
+    return queue_event_schema.dump(queue_event)
+
+
[email protected]_access_dataset("DELETE")
+@provide_session
+def delete_dag_dataset_queue_event(
+    *, dag_id: str, uri: str, before: str | None = None, session: Session = 
NEW_SESSION
+) -> APIResponse:
+    """Delete a queued Dataset event for a DAG."""
+    dataset_id = 
session.scalars(select(DatasetModel.id).where(DatasetModel.uri == 
uri)).one_or_none()
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
dataset_id=dataset_id, before=before)
+    delete_stmt = delete(DatasetDagRunQueue).where(*where_clause)
+    result = session.execute(delete_stmt)
+    if result.rowcount > 0:
+        return NoContent, HTTPStatus.NO_CONTENT
+    raise NotFound(
+        "Queue event not found",
+        detail=f"Queue event with dag_id: `{dag_id}` and dataset uri: `{uri}` 
was not found",
+    )
+
+
[email protected]_access_dataset("GET")
+@provide_session
+def get_dag_dataset_queue_events(
+    *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Get queued Dataset events for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
before=before)
+    query = (
+        select(DatasetDagRunQueue, DatasetModel.uri)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    result = session.execute(query).all()
+    total_entries = get_query_count(query, session=session)
+    if not result:
+        raise NotFound(
+            "Queue event not found",
+            detail=f"Queue event with dag_id: `{dag_id}` was not found",
+        )
+    queue_events = [
+        QueueEvent(created_at=ddrq.created_at, dag_id=ddrq.target_dag_id, 
uri=uri) for ddrq, uri in result
+    ]
+    return queue_event_collection_schema.dump(
+        QueueEventCollection(queue_events=queue_events, 
total_entries=total_entries)
+    )
+
+
[email protected]_access_dataset("DELETE")
+@provide_session
+def delete_dag_dataset_queue_events(
+    *, dag_id: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Delete queued Dataset events for a DAG."""
+    where_clause = _generate_queue_event_where_clause(dag_id=dag_id, 
before=before)
+    delete_stmt = delete(DatasetDagRunQueue).where(*where_clause)
+    result = session.execute(delete_stmt)
+    if result.rowcount > 0:
+        return NoContent, HTTPStatus.NO_CONTENT
+
+    raise NotFound(
+        "Queue event not found",
+        detail=f"Queue event with dag_id: `{dag_id}` was not found",
+    )
+
+
[email protected]_access_dataset("GET")
+@provide_session
+def get_dataset_queue_events(
+    *, uri: str, before: str | None = None, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Get queued Dataset events for a Dataset."""
+    where_clause = _generate_queue_event_where_clause(uri=uri, before=before)
+    query = (
+        select(DatasetDagRunQueue, DatasetModel.uri)
+        .join(DatasetModel, DatasetDagRunQueue.dataset_id == DatasetModel.id)
+        .where(*where_clause)
+    )
+    total_entries = get_query_count(query, session=session)
+    result = session.execute(query).all()
+    if total_entries > 0:
+        queue_events = [
+            QueueEvent(created_at=ddrq.created_at, dag_id=ddrq.target_dag_id, 
uri=uri) for ddrq, uri in result
+        ]
+        return queue_event_collection_schema.dump(
+            QueueEventCollection(queue_events=queue_events, 
total_entries=total_entries)
+        )
+    raise NotFound(
+        "Queue event not found",
+        detail=f"Queue event with dataset uri: `{uri}` was not found",
+    )
+
+
[email protected]_access_dataset("DELETE")

Review Comment:
   Same with the delete, only allow them to delete for DAGs they have read on.



##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -986,6 +986,163 @@ paths:
         "404":
           $ref: "#/components/responses/NotFound"
 
+  /dags/{dag_id}/datasets/eventQueue/{uri}:
+    parameters:
+      - $ref: "#/components/parameters/DAGID"
+      - $ref: "#/components/parameters/DatasetURI"
+
+    get:
+      summary: Get a queued Dataset event for a DAG
+      description: |
+        Get a queued Dataset event for a DAG.
+
+        *New in version 2.9.0*
+      x-openapi-router-controller: 
airflow.api_connexion.endpoints.dataset_endpoint
+      operationId: get_dag_dataset_queue_event
+      parameters:
+        - $ref: "#/components/parameters/Before"
+      tags: [Dataset]
+      responses:
+        "200":
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: "#/components/schemas/DatasetDagRunQueue"
+        "401":
+          $ref: "#/components/responses/Unauthenticated"
+        "403":
+          $ref: "#/components/responses/PermissionDenied"
+        "404":
+          $ref: "#/components/responses/NotFound"
+
+    delete:
+      summary: Delete a queued Dataset event for a DAG.
+      description: |
+        Delete a queued Dataset event for a DAG.
+
+        *New in version 2.9.0*
+      x-openapi-router-controller: 
airflow.api_connexion.endpoints.dataset_endpoint
+      operationId: delete_dag_dataset_queue_event
+      parameters:
+        - $ref: "#/components/parameters/Before"
+      tags: [Dataset]
+      responses:
+        "204":
+          description: Success.
+        "400":
+          $ref: "#/components/responses/BadRequest"
+        "401":
+          $ref: "#/components/responses/Unauthenticated"
+        "403":
+          $ref: "#/components/responses/PermissionDenied"
+        "404":
+          $ref: "#/components/responses/NotFound"
+
+  /dags/{dag_id}/datasets/eventQueue:
+    parameters:
+      - $ref: "#/components/parameters/DAGID"
+
+    get:
+      summary: Get queued Dataset events for a DAG.
+      description: |
+        Get queued Dataset events for a DAG.
+
+        *New in version 2.9.0*
+      x-openapi-router-controller: 
airflow.api_connexion.endpoints.dataset_endpoint
+      operationId: get_dag_dataset_queue_events
+      parameters:
+        - $ref: "#/components/parameters/Before"
+      tags: [Dataset]
+      responses:
+        "200":
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: "#/components/schemas/DatasetDagRunQueueCollection"
+        "401":
+          $ref: "#/components/responses/Unauthenticated"
+        "403":
+          $ref: "#/components/responses/PermissionDenied"
+        "404":
+          $ref: "#/components/responses/NotFound"
+
+    delete:
+      summary: Delete queued Dataset events for a DAG.
+      description: |
+        Delete queued Dataset events for a DAG.
+
+        *New in version 2.9.0*
+      x-openapi-router-controller: 
airflow.api_connexion.endpoints.dataset_endpoint
+      operationId: delete_dag_dataset_queue_events
+      parameters:
+        - $ref: "#/components/parameters/Before"
+      tags: [Dataset]
+      responses:
+        "204":
+          description: Success.
+        "400":
+          $ref: "#/components/responses/BadRequest"
+        "401":
+          $ref: "#/components/responses/Unauthenticated"
+        "403":
+          $ref: "#/components/responses/PermissionDenied"
+        "404":
+          $ref: "#/components/responses/NotFound"
+
+  /datasets/eventQueue/{uri}:
+    parameters:
+      - $ref: "#/components/parameters/DatasetURI"
+
+    get:
+      summary: Get queued Dataset events for a Dataset.
+      description: |
+        Get queued Dataset events for a Dataset
+
+        *New in version 2.9.0*
+      x-openapi-router-controller: 
airflow.api_connexion.endpoints.dataset_endpoint
+      operationId: get_dataset_queue_events
+      parameters:
+        - $ref: "#/components/parameters/Before"
+      tags: [Dataset]
+      responses:
+        "200":
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: "#/components/schemas/DatasetDagRunQueueCollection"
+        "401":
+          $ref: "#/components/responses/Unauthenticated"
+        "403":
+          $ref: "#/components/responses/PermissionDenied"
+        "404":
+          $ref: "#/components/responses/NotFound"
+
+    delete:
+      summary: Delete queued Dataset events for a Dataset.
+      description: |
+        Delete queued Dataset events for a Dataset.
+
+        *New in version 2.9.0*
+      x-openapi-router-controller: 
airflow.api_connexion.endpoints.dataset_endpoint
+      operationId: delete_dataset_queue_events
+      parameters:

Review Comment:
   I can see the "broad" delete being useful, but yes care is required (and 
should be with any delete imo - not sure we need to do anything special here).



##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3014,6 +3171,15 @@ components:
           description: Custom notes left by users for this Dag Run.
           type: string
 
+    DeleteQueueEvent:
+      type: object
+      properties:
+        cutoff_time:
+          description: The cutoff time of DatasetDagQueue.
+          format: date-time
+          type: string
+          nullable: true
+

Review Comment:
   ```suggestion
   ```
   
   We don't need this any longer since we are using `Before` via querystring, 
right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to