This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b9546078ba1 API: Add POST /dags/{dag_id}/clearDagRuns bulk endpoint 
(#67709)
b9546078ba1 is described below

commit b9546078ba1f5b1e7522be1b2e231220a92ef6d2
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Mon Jun 1 19:13:44 2026 +0200

    API: Add POST /dags/{dag_id}/clearDagRuns bulk endpoint (#67709)
    
    * API: Add POST /dags/{dag_id}/clearDagRuns bulk endpoint
    
    Mirrors ``POST /dags/{dag_id}/clearTaskInstances`` for Dag Runs: a
    single round-trip clears N runs and (optionally) attaches a note in
    the same transaction, removing the per-run fan-out the UI does today.
    
    The URL sits at the parent-Dag level (``/clearDagRuns`` not
    ``/dagRuns/clear``) on purpose, to match the existing
    ``clearTaskInstances`` convention. To register at that prefix without
    collapsing into ``task_instances_router``, ``dag_run.py`` gains a
    sibling router ``dag_run_at_dag_router`` with prefix ``/dags/{dag_id}``.
    
    URL ``dag_id`` is concrete or ``~``. With ``~``, every entry in
    ``dag_runs`` must carry its own ``dag_id``. With a specific Dag,
    entries may omit ``dag_id`` and inherit from the URL (and the route
    rejects mismatches with 400). Duplicate ``(dag_id, run_id)`` entries
    collapse to one operation, matching ``BulkDagRunService``'s
    ``handle_bulk_delete`` semantics.
    
    ``dry_run`` is the safe default — returns the union of affected task
    instances across the listed runs (or the ``NewTaskResponse``
    placeholders for the ``only_new`` path) without touching state.
    Real clear returns a ``DAGRunCollectionResponse`` with the post-clear
    runs.
    
    To keep both endpoints in sync, the per-run lookup, dry-run TI
    computation, and the clear+note step are pulled out of the single
    ``clear_dag_run`` route into ``services/public/dag_run.py`` as
    ``get_dag_run_and_dag_for_clear`` / ``dry_run_clear_dag_run`` /
    ``perform_clear_dag_run``. The single-run route now composes them
    instead of duplicating the logic.
    
    Auth uses a new ``requires_access_dag_run_clear_bulk`` dependency
    modelled on ``requires_access_dag_run_bulk`` — same per-Dag team
    caching, same wildcard-then-400 contract.
    
    * Simplify clearDagRuns dedup with an ordered dict and rename handler
    
    * Regenerate OpenAPI spec and UI client for clearDagRuns rename
    
    * Factor out shared bulk dag-run authorization request builder
    
    * Restore per-dag team-name dedup in bulk dag-run authorization
    
    * Patch resolver in service module for clear-endpoint resolver test
    
    * Test bulk clearDagRuns rejects unauthorized dag_ids in request body
---
 .../api_fastapi/core_api/datamodels/dag_run.py     |  26 ++-
 .../core_api/openapi/v2-rest-api-generated.yaml    | 112 ++++++++++-
 .../api_fastapi/core_api/routes/public/__init__.py |   3 +-
 .../api_fastapi/core_api/routes/public/dag_run.py  | 175 ++++++++++-------
 .../src/airflow/api_fastapi/core_api/security.py   |  75 ++++---
 .../core_api/services/public/dag_run.py            | 106 +++++++++-
 .../src/airflow/ui/openapi-gen/queries/common.ts   |   1 +
 .../src/airflow/ui/openapi-gen/queries/queries.ts  |  18 +-
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |  60 +++++-
 .../ui/openapi-gen/requests/services.gen.ts        |  30 ++-
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  58 +++++-
 .../core_api/routes/public/test_dag_run.py         | 216 ++++++++++++++++++++-
 .../src/airflowctl/api/datamodels/generated.py     |  38 +++-
 13 files changed, 798 insertions(+), 120 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 8373847328d..2cfc4a00017 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -57,8 +57,8 @@ class BulkDAGRunBody(StrictBaseModel):
     dag_id: str | None = None
 
 
-class DAGRunClearBody(StrictBaseModel):
-    """Dag Run serializer for clear endpoint body."""
+class BaseDAGRunClear(StrictBaseModel):
+    """Shared options for the single-run and bulk Dag Run clear endpoints."""
 
     dry_run: bool = True
     only_failed: bool = False
@@ -68,25 +68,31 @@ class DAGRunClearBody(StrictBaseModel):
     )
     run_on_latest_version: bool | None = Field(
         default=None,
-        description="(Experimental) Run on the latest bundle version of the 
Dag after clearing the Dag Run. "
+        description="(Experimental) Run on the latest bundle version of the 
Dag after clearing. "
         "If not specified, falls back to the DAG-level 
``rerun_with_latest_version`` parameter, "
         "then the ``[core] rerun_with_latest_version`` config option, "
-        "and finally ``False`` (the historical default for clear/rerun).",
-    )
-    note: str | None = Field(
-        default=None,
-        max_length=1000,
+        "and finally ``False``.",
     )
+    note: str | None = Field(default=None, max_length=1000)
 
     @model_validator(mode="before")
     @classmethod
-    def validate_model(cls, data: Any) -> Any:
-        """Validate clear Dag run form."""
+    def validate_only_new_only_failed_mutually_exclusive(cls, data: Any) -> 
Any:
         if data.get("only_new") and data.get("only_failed"):
             raise ValueError("only_new and only_failed are mutually exclusive")
         return data
 
 
+class DAGRunClearBody(BaseDAGRunClear):
+    """Dag Run serializer for clear endpoint body."""
+
+
+class BulkDAGRunClearBody(BaseDAGRunClear):
+    """Request body for the bulk clear Dag Runs endpoint."""
+
+    dag_runs: list[BulkDAGRunBody] = Field(min_length=1)
+
+
 class DAGRunResponse(BaseModel):
     """Dag Run serializer for responses."""
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 9cfa32f2411..be17e2fd70e 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -2980,6 +2980,69 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /api/v2/dags/{dag_id}/clearDagRuns:
+    post:
+      tags:
+      - DagRun
+      summary: Clear Dag Runs
+      description: Clear multiple Dag Runs in a single request.
+      operationId: clear_dag_runs
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/BulkDAGRunClearBody'
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                anyOf:
+                - $ref: 
'#/components/schemas/ClearTaskInstanceCollectionResponse'
+                - $ref: '#/components/schemas/DAGRunCollectionResponse'
+                title: Response Clear Dag Runs
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '400':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Bad Request
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /api/v2/dagSources/{dag_id}:
     get:
       tags:
@@ -11883,6 +11946,49 @@ components:
       - dag_run_id
       title: BulkDAGRunBody
       description: Request body for bulk delete operations on Dag Runs.
+    BulkDAGRunClearBody:
+      properties:
+        dry_run:
+          type: boolean
+          title: Dry Run
+          default: true
+        only_failed:
+          type: boolean
+          title: Only Failed
+          default: false
+        only_new:
+          type: boolean
+          title: Only New
+          description: Only queue newly added tasks in the latest Dag version 
without
+            clearing existing tasks.
+          default: false
+        run_on_latest_version:
+          anyOf:
+          - type: boolean
+          - type: 'null'
+          title: Run On Latest Version
+          description: (Experimental) Run on the latest bundle version of the 
Dag
+            after clearing. If not specified, falls back to the DAG-level 
``rerun_with_latest_version``
+            parameter, then the ``[core] rerun_with_latest_version`` config 
option,
+            and finally ``False``.
+        note:
+          anyOf:
+          - type: string
+            maxLength: 1000
+          - type: 'null'
+          title: Note
+        dag_runs:
+          items:
+            $ref: '#/components/schemas/BulkDAGRunBody'
+          type: array
+          minItems: 1
+          title: Dag Runs
+      additionalProperties: false
+      type: object
+      required:
+      - dag_runs
+      title: BulkDAGRunClearBody
+      description: Request body for the bulk clear Dag Runs endpoint.
     BulkDeleteAction_BulkDAGRunBody_:
       properties:
         action:
@@ -13093,9 +13199,9 @@ components:
           - type: 'null'
           title: Run On Latest Version
           description: (Experimental) Run on the latest bundle version of the 
Dag
-            after clearing the Dag Run. If not specified, falls back to the 
DAG-level
-            ``rerun_with_latest_version`` parameter, then the ``[core] 
rerun_with_latest_version``
-            config option, and finally ``False`` (the historical default for 
clear/rerun).
+            after clearing. If not specified, falls back to the DAG-level 
``rerun_with_latest_version``
+            parameter, then the ``[core] rerun_with_latest_version`` config 
option,
+            and finally ``False``.
         note:
           anyOf:
           - type: string
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py
index 4e6ebba1178..e305f0ac248 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py
@@ -28,7 +28,7 @@ from airflow.api_fastapi.core_api.routes.public.backfills 
import backfills_route
 from airflow.api_fastapi.core_api.routes.public.config import config_router
 from airflow.api_fastapi.core_api.routes.public.connections import 
connections_router
 from airflow.api_fastapi.core_api.routes.public.dag_parsing import 
dag_parsing_router
-from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router
+from airflow.api_fastapi.core_api.routes.public.dag_run import 
dag_run_at_dag_router, dag_run_router
 from airflow.api_fastapi.core_api.routes.public.dag_sources import 
dag_sources_router
 from airflow.api_fastapi.core_api.routes.public.dag_stats import 
dag_stats_router
 from airflow.api_fastapi.core_api.routes.public.dag_tags import dag_tags_router
@@ -69,6 +69,7 @@ authenticated_router.include_router(assets_router)
 authenticated_router.include_router(backfills_router)
 authenticated_router.include_router(connections_router)
 authenticated_router.include_router(dag_run_router)
+authenticated_router.include_router(dag_run_at_dag_router)
 authenticated_router.include_router(dag_sources_router)
 authenticated_router.include_router(dag_stats_router)
 authenticated_router.include_router(config_router)
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index aa82b14fefd..18af252c495 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -47,7 +47,6 @@ from airflow.api_fastapi.common.db.dag_runs import (
     attach_dag_versions_to_runs,
     eager_load_dag_run_for_list,
 )
-from airflow.api_fastapi.common.db.task_instances import 
eager_load_TI_and_TIH_for_validation
 from airflow.api_fastapi.common.parameters import (
     FilterOptionEnum,
     FilterParam,
@@ -79,6 +78,7 @@ from airflow.api_fastapi.core_api.datamodels.assets import 
AssetEventCollectionR
 from airflow.api_fastapi.core_api.datamodels.common import BulkBody, 
BulkResponse
 from airflow.api_fastapi.core_api.datamodels.dag_run import (
     BulkDAGRunBody,
+    BulkDAGRunClearBody,
     DAGRunClearBody,
     DAGRunCollectionResponse,
     DagRunMutableStates,
@@ -99,22 +99,28 @@ from airflow.api_fastapi.core_api.security import (
     requires_access_asset,
     requires_access_dag,
     requires_access_dag_run_bulk,
+    requires_access_dag_run_clear_bulk,
+)
+from airflow.api_fastapi.core_api.services.public.dag_run import (
+    BulkDagRunService,
+    DagRunWaiter,
+    dry_run_clear_dag_run,
+    get_dag_run_and_dag_for_clear,
+    perform_clear_dag_run,
 )
-from airflow.api_fastapi.core_api.services.public.common import 
resolve_run_on_latest_version
-from airflow.api_fastapi.core_api.services.public.dag_run import 
BulkDagRunService, DagRunWaiter
 from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.exceptions import ParamValidationError
 from airflow.listeners.listener import get_listener_manager
 from airflow.models import DagModel, DagRun
 from airflow.models.asset import AssetEvent
 from airflow.models.dag_version import DagVersion
-from airflow.models.taskinstance import TaskInstance
-from airflow.utils.state import DagRunState, TaskInstanceState
+from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 log = structlog.get_logger(__name__)
 
 dag_run_router = AirflowRouter(tags=["DagRun"], 
prefix="/dags/{dag_id}/dagRuns")
+dag_run_at_dag_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}")
 
 
 @dag_run_router.get(
@@ -321,81 +327,116 @@ def clear_dag_run(
     session: SessionDep,
     user: GetUserDep,
 ) -> ClearTaskInstanceCollectionResponse | DAGRunResponse:
-    dag_run = session.scalar(
-        select(DagRun).filter_by(dag_id=dag_id, 
run_id=dag_run_id).options(joinedload(DagRun.dag_model))
+    dag_run, dag = get_dag_run_and_dag_for_clear(
+        session=session, dag_bag=dag_bag, dag_id=dag_id, dag_run_id=dag_run_id
     )
-    if dag_run is None:
-        raise HTTPException(
-            status.HTTP_404_NOT_FOUND,
-            f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` 
was not found",
+
+    if body.dry_run:
+        task_instances = dry_run_clear_dag_run(
+            session=session,
+            dag_bag=dag_bag,
+            dag_id=dag_id,
+            dag_run_id=dag_run_id,
+            only_failed=body.only_failed,
+            only_new=body.only_new,
+        )
+        return ClearTaskInstanceCollectionResponse(
+            task_instances=task_instances,
+            total_entries=len(task_instances),
         )
 
-    dag = dag_bag.get_dag_for_run(dag_run, session=session)
+    return perform_clear_dag_run(
+        session=session,
+        dag=dag,
+        dag_run=dag_run,
+        dag_id=dag_id,
+        only_failed=body.only_failed,
+        only_new=body.only_new,
+        run_on_latest_version=body.run_on_latest_version,
+        note=body.note,
+        user=user,
+    )
 
-    if not dag:
-        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
 
-    resolved_run_on_latest = 
resolve_run_on_latest_version(body.run_on_latest_version, dag_id, session)
+@dag_run_at_dag_router.post(
+    "/clearDagRuns",
+    responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, 
status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag_run_clear_bulk()), 
Depends(action_logging())],
+)
+def clear_dag_runs(
+    dag_id: str,
+    body: BulkDAGRunClearBody,
+    dag_bag: DagBagDep,
+    session: SessionDep,
+    user: GetUserDep,
+) -> ClearTaskInstanceCollectionResponse | DAGRunCollectionResponse:
+    """Clear multiple Dag Runs in a single request."""
+    url_dag_id_is_wildcard = dag_id == "~"
+
+    # No ordered set type in Python, using a dict with throwaway values as 
replacement.
+    runs_to_clear: dict[tuple[str, str], None] = {}
+    for run in body.dag_runs:
+        if url_dag_id_is_wildcard:
+            if not run.dag_id or run.dag_id == "~":
+                raise HTTPException(
+                    status.HTTP_400_BAD_REQUEST,
+                    f"When the URL dag_id is '~', every entry must provide a 
concrete dag_id "
+                    f"(missing on dag_run_id: {run.dag_run_id!r}).",
+                )
+            run_to_clear = (run.dag_id, run.dag_run_id)
+        else:
+            entity_dag_id = run.dag_id or dag_id
+            if entity_dag_id != dag_id:
+                raise HTTPException(
+                    status.HTTP_400_BAD_REQUEST,
+                    f"Entry dag_id {entity_dag_id!r} does not match the URL 
dag_id {dag_id!r}.",
+                )
+            run_to_clear = (dag_id, run.dag_run_id)
+        runs_to_clear[run_to_clear] = None
 
     if body.dry_run:
-        if body.only_new:
-            # Determine "new" tasks by TI existence: a task is new when the 
latest Dag
-            # version contains it but the current run has no TaskInstance row 
for it yet.
-            # This is more reliable than the version-comparison approach used 
by
-            # dag.clear(only_new=True, dry_run=True) which returns an empty 
set when
-            # created_dag_version_id is None (e.g. LocalDagBundle).
-            latest_dag = get_latest_version_of_dag(dag_bag, dag_id, session)
-            existing_task_ids = set(
-                session.scalars(
-                    select(TaskInstance.task_id).where(
-                        TaskInstance.dag_id == dag_id,
-                        TaskInstance.run_id == dag_run_id,
-                    )
-                ).all()
+        affected: list[TaskInstanceResponse | NewTaskResponse] = []
+        for run_dag_id, run_id in runs_to_clear:
+            get_dag_run_and_dag_for_clear(
+                session=session, dag_bag=dag_bag, dag_id=run_dag_id, 
dag_run_id=run_id
             )
-            new_task_ids = sorted(set(latest_dag.task_ids) - existing_task_ids)
-            task_instances: list[TaskInstanceResponse | NewTaskResponse] = [
-                NewTaskResponse(task_id=task_id, task_display_name=task_id) 
for task_id in new_task_ids
-            ]
-        else:
-            # Query task instances directly with proper eager loading so that 
all
-            # relationships required by TaskInstanceResponse (dag_run, 
dag_model,
-            # dag_version, rendered_task_instance_fields) are populated.
-            # dag.clear(dry_run=True) returns raw ORM objects without these 
joins.
-            ti_query = 
eager_load_TI_and_TIH_for_validation(select(TaskInstance))
-            ti_query = ti_query.where(
-                TaskInstance.dag_id == dag_id,
-                TaskInstance.run_id == dag_run_id,
-            )
-            if body.only_failed:
-                ti_query = ti_query.where(
-                    TaskInstance.state.in_([TaskInstanceState.FAILED, 
TaskInstanceState.UPSTREAM_FAILED])
+            affected.extend(
+                dry_run_clear_dag_run(
+                    session=session,
+                    dag_bag=dag_bag,
+                    dag_id=run_dag_id,
+                    dag_run_id=run_id,
+                    only_failed=body.only_failed,
+                    only_new=body.only_new,
                 )
-            task_instances = list(session.scalars(ti_query))
-
+            )
         return ClearTaskInstanceCollectionResponse(
-            task_instances=task_instances,
-            total_entries=len(task_instances),
+            task_instances=affected,
+            total_entries=len(affected),
         )
 
-    dag.clear(
-        run_id=dag_run_id,
-        task_ids=None,
-        only_new=body.only_new,
-        only_failed=body.only_failed,
-        run_on_latest_version=resolved_run_on_latest,
-        session=session,
+    cleared_runs: list[DagRun] = []
+    for run_dag_id, run_id in runs_to_clear:
+        dag_run, dag = get_dag_run_and_dag_for_clear(
+            session=session, dag_bag=dag_bag, dag_id=run_dag_id, 
dag_run_id=run_id
+        )
+        cleared_runs.append(
+            perform_clear_dag_run(
+                session=session,
+                dag=dag,
+                dag_run=dag_run,
+                dag_id=run_dag_id,
+                only_failed=body.only_failed,
+                only_new=body.only_new,
+                run_on_latest_version=body.run_on_latest_version,
+                note=body.note,
+                user=user,
+            )
+        )
+    return DAGRunCollectionResponse(
+        dag_runs=cleared_runs,
+        total_entries=len(cleared_runs),
     )
-    dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == 
dag_run.id))
-    if not dag_run_cleared:
-        raise HTTPException(status.HTTP_404_NOT_FOUND, "Dag run not found 
after clearing")
-    if body.note is not None:
-        if dag_run_cleared.dag_run_note is None:
-            dag_run_cleared.note = (body.note, user.get_id())
-        else:
-            dag_run_cleared.dag_run_note.content = body.note
-            dag_run_cleared.dag_run_note.user_id = user.get_id()
-    return dag_run_cleared
 
 
 @dag_run_router.get(
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py 
b/airflow-core/src/airflow/api_fastapi/core_api/security.py
index bb3413b7f05..154929af064 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/security.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py
@@ -63,7 +63,7 @@ from airflow.api_fastapi.core_api.datamodels.common import (
     BulkUpdateAction,
 )
 from airflow.api_fastapi.core_api.datamodels.connections import ConnectionBody
-from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody
+from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody, 
BulkDAGRunClearBody
 from airflow.api_fastapi.core_api.datamodels.pools import PoolBody
 from airflow.api_fastapi.core_api.datamodels.variables import VariableBody
 from airflow.configuration import conf
@@ -730,46 +730,73 @@ def requires_access_variable_bulk() -> 
Callable[[BulkBody[VariableBody], BaseUse
     return inner
 
 
+def _build_dag_run_access_requests(
+    entity_methods: list[tuple[str, ResourceMethod]],
+) -> list[IsAuthorizedDagRequest]:
+    """
+    Build per-entity DagRun authorization requests for a batched access check.
+
+    ``entity_methods`` is a list of ``(dag_id, method)`` pairs with 
unresolvable
+    entries (no dag_id or the ``~`` wildcard) already filtered out by the 
caller.
+    The team for each dag is resolved once and shared across that dag's 
requests.
+    """
+    resolved_dag_ids = {dag_id for dag_id, _ in entity_methods}
+    dag_id_to_team = {dag_id: DagModel.get_team_name(dag_id) for dag_id in 
resolved_dag_ids}
+    return [
+        {
+            "method": method,
+            "access_entity": DagAccessEntity.RUN,
+            "details": DagDetails(id=dag_id, 
team_name=dag_id_to_team.get(dag_id)),
+        }
+        for dag_id, method in entity_methods
+    ]
+
+
 def requires_access_dag_run_bulk() -> Callable[[BulkBody[BulkDAGRunBody], 
BaseUser, str], None]:
     def inner(
         request: BulkBody[BulkDAGRunBody],
         user: GetUserDep,
         dag_id: str,
     ) -> None:
-        resolved_dag_ids: set[str] = set()
-        for action in request.actions:
-            for entity in action.entities:
-                if isinstance(entity, str):
-                    entity_dag_id: str | None = dag_id
-                else:
-                    entity_dag_id = entity.dag_id or dag_id
-                if entity_dag_id and entity_dag_id != "~":
-                    resolved_dag_ids.add(entity_dag_id)
-
-        dag_id_to_team = {d: DagModel.get_team_name(d) for d in 
resolved_dag_ids}
-
-        requests: list[IsAuthorizedDagRequest] = []
+        entity_methods: list[tuple[str, ResourceMethod]] = []
         for action in request.actions:
             methods = _get_resource_methods_from_bulk_request(action)
             for entity in action.entities:
                 if isinstance(entity, str):
-                    entity_dag_id = dag_id
+                    entity_dag_id: str | None = dag_id
                 else:
                     entity_dag_id = entity.dag_id or dag_id
                 # Entities that can't be resolved are surfaced as 400 in the 
service's BulkResponse.
                 if not entity_dag_id or entity_dag_id == "~":
                     continue
                 for method in methods:
-                    requests.append(
-                        {
-                            "method": method,
-                            "access_entity": DagAccessEntity.RUN,
-                            "details": DagDetails(
-                                id=entity_dag_id, 
team_name=dag_id_to_team.get(entity_dag_id)
-                            ),
-                        }
-                    )
+                    entity_methods.append((entity_dag_id, method))
+
+        requests = _build_dag_run_access_requests(entity_methods)
+        _requires_access(
+            is_authorized_callback=lambda: 
get_auth_manager().batch_is_authorized_dag(
+                requests=requests,
+                user=user,
+            )
+        )
+
+    return inner
+
+
+def requires_access_dag_run_clear_bulk() -> Callable[[BulkDAGRunClearBody, 
BaseUser, str], None]:
+    def inner(
+        body: BulkDAGRunClearBody,
+        user: GetUserDep,
+        dag_id: str,
+    ) -> None:
+        entity_methods: list[tuple[str, ResourceMethod]] = []
+        for run in body.dag_runs:
+            entity_dag_id = run.dag_id or dag_id
+            if not entity_dag_id or entity_dag_id == "~":
+                continue
+            entity_methods.append((entity_dag_id, "PUT"))
 
+        requests = _build_dag_run_access_requests(entity_methods)
         _requires_access(
             is_authorized_callback=lambda: 
get_auth_manager().batch_is_authorized_dag(
                 requests=requests,
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
index 4c5a5b090b6..3c9d8cd7eba 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
@@ -25,10 +25,13 @@ from typing import TYPE_CHECKING, Any
 
 import attrs
 import structlog
-from fastapi import status
+from fastapi import HTTPException, status
 from sqlalchemy import select, tuple_
-from sqlalchemy.orm import Session
+from sqlalchemy.orm import Session, joinedload
 
+from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
+from airflow.api_fastapi.common.dagbag import DagBagDep, 
get_latest_version_of_dag
+from airflow.api_fastapi.common.db.task_instances import 
eager_load_TI_and_TIH_for_validation
 from airflow.api_fastapi.core_api.datamodels.common import (
     BulkActionNotOnExistence,
     BulkActionResponse,
@@ -38,18 +41,113 @@ from airflow.api_fastapi.core_api.datamodels.common import 
(
     BulkUpdateAction,
 )
 from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody, 
DagRunMutableStates
-from airflow.api_fastapi.core_api.services.public.common import BulkService
+from airflow.api_fastapi.core_api.datamodels.task_instances import 
NewTaskResponse
+from airflow.api_fastapi.core_api.services.public.common import BulkService, 
resolve_run_on_latest_version
 from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
 from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
 from airflow.utils.session import create_session_async
-from airflow.utils.state import State
+from airflow.utils.state import State, TaskInstanceState
 
 if TYPE_CHECKING:
     from collections.abc import AsyncGenerator, Iterator
 
+    from airflow.serialization.definitions.dag import SerializedDAG
+
 log = structlog.get_logger(__name__)
 
 
+def get_dag_run_and_dag_for_clear(
+    *,
+    session: Session,
+    dag_bag: DagBagDep,
+    dag_id: str,
+    dag_run_id: str,
+) -> tuple[DagRun, SerializedDAG]:
+    dag_run = session.scalar(
+        select(DagRun).filter_by(dag_id=dag_id, 
run_id=dag_run_id).options(joinedload(DagRun.dag_model))
+    )
+    if dag_run is None:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` 
was not found",
+        )
+    dag = dag_bag.get_dag_for_run(dag_run, session=session)
+    if not dag:
+        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+    return dag_run, dag
+
+
+def dry_run_clear_dag_run(
+    *,
+    session: Session,
+    dag_bag: DagBagDep,
+    dag_id: str,
+    dag_run_id: str,
+    only_failed: bool,
+    only_new: bool,
+) -> list[Any]:
+    if only_new:
+        # ``dag.clear(only_new=True, dry_run=True)`` returns nothing when
+        # ``created_dag_version_id`` is None (e.g. LocalDagBundle), so derive 
new
+        # tasks from TI existence instead.
+        latest_dag = get_latest_version_of_dag(dag_bag, dag_id, session)
+        existing_task_ids = set(
+            session.scalars(
+                select(TaskInstance.task_id).where(
+                    TaskInstance.dag_id == dag_id,
+                    TaskInstance.run_id == dag_run_id,
+                )
+            ).all()
+        )
+        new_task_ids = sorted(set(latest_dag.task_ids) - existing_task_ids)
+        return [NewTaskResponse(task_id=task_id, task_display_name=task_id) 
for task_id in new_task_ids]
+
+    ti_query = eager_load_TI_and_TIH_for_validation(select(TaskInstance))
+    ti_query = ti_query.where(
+        TaskInstance.dag_id == dag_id,
+        TaskInstance.run_id == dag_run_id,
+    )
+    if only_failed:
+        ti_query = ti_query.where(
+            TaskInstance.state.in_([TaskInstanceState.FAILED, 
TaskInstanceState.UPSTREAM_FAILED])
+        )
+    return list(session.scalars(ti_query))
+
+
+def perform_clear_dag_run(
+    *,
+    session: Session,
+    dag: SerializedDAG,
+    dag_run: DagRun,
+    dag_id: str,
+    only_failed: bool,
+    only_new: bool,
+    run_on_latest_version: bool | None,
+    note: str | None,
+    user: BaseUser,
+) -> DagRun:
+    resolved_run_on_latest = 
resolve_run_on_latest_version(run_on_latest_version, dag_id, session)
+    dag.clear(
+        run_id=dag_run.run_id,
+        task_ids=None,
+        only_new=only_new,
+        only_failed=only_failed,
+        run_on_latest_version=resolved_run_on_latest,
+        session=session,
+    )
+    dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == 
dag_run.id))
+    if not dag_run_cleared:
+        raise HTTPException(status.HTTP_404_NOT_FOUND, "Dag run not found 
after clearing")
+    if note is not None:
+        if dag_run_cleared.dag_run_note is None:
+            dag_run_cleared.note = (note, user.get_id())
+        else:
+            dag_run_cleared.dag_run_note.content = note
+            dag_run_cleared.dag_run_note.user_id = user.get_id()
+    return dag_run_cleared
+
+
 @attrs.define
 class DagRunWaiter:
     """Wait for the specified dag run to finish, and collect info from it."""
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index 632d3111576..fb0ab311c5a 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -1037,6 +1037,7 @@ export type 
ConnectionServiceCreateDefaultConnectionsMutationResult = Awaited<Re
 export type DagRunServiceTriggerDagRunMutationResult = 
Awaited<ReturnType<typeof DagRunService.triggerDagRun>>;
 export type DagRunServiceClearDagRunMutationResult = Awaited<ReturnType<typeof 
DagRunService.clearDagRun>>;
 export type DagRunServiceGetListDagRunsBatchMutationResult = 
Awaited<ReturnType<typeof DagRunService.getListDagRunsBatch>>;
+export type DagRunServiceClearDagRunsMutationResult = 
Awaited<ReturnType<typeof DagRunService.clearDagRuns>>;
 export type DagServiceFavoriteDagMutationResult = Awaited<ReturnType<typeof 
DagService.favoriteDag>>;
 export type DagServiceUnfavoriteDagMutationResult = Awaited<ReturnType<typeof 
DagService.unfavoriteDag>>;
 export type TaskInstanceServiceGetTaskInstancesBatchMutationResult = 
Awaited<ReturnType<typeof TaskInstanceService.getTaskInstancesBatch>>;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 01e3d78e69c..41118b64c74 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -2,7 +2,7 @@
 
 import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from 
"@tanstack/react-query";
 import { AssetService, AssetStateService, AuthLinksService, BackfillService, 
CalendarService, ConfigService, ConnectionService, DagParsingService, 
DagRunService, DagService, DagSourceService, DagStatsService, 
DagVersionService, DagWarningService, DashboardService, DeadlinesService, 
DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, 
GanttService, GridService, ImportErrorService, JobService, LoginService, 
MonitorService, PartitionedDagRunService, PluginService, P [...]
-import { AssetStateBody, BackfillPostBody, BulkBody_BulkDAGRunBody_, 
BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, 
BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, 
CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, 
DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, 
MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, 
TaskInstancesBatchBody, TaskStateBody, TaskStatePatchBody, TriggerDAGRunPo [...]
+import { AssetStateBody, BackfillPostBody, BulkBody_BulkDAGRunBody_, 
BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, 
BulkBody_VariableBody_, BulkDAGRunClearBody, ClearTaskInstancesBody, 
ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, 
DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, 
GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, 
PoolPatchBody, TaskInstancesBatchBody, TaskStateBody, TaskStatePatch [...]
 import * as Common from "./common";
 /**
 * Get Assets
@@ -2210,6 +2210,22 @@ export const useDagRunServiceGetListDagRunsBatch = 
<TData = Common.DagRunService
   requestBody: DAGRunsBatchBody;
 }, TContext>({ mutationFn: ({ dagId, requestBody }) => 
DagRunService.getListDagRunsBatch({ dagId, requestBody }) as unknown as 
Promise<TData>, ...options });
 /**
+* Clear Dag Runs
+* Clear multiple Dag Runs in a single request.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.requestBody
+* @returns unknown Successful Response
+* @throws ApiError
+*/
+export const useDagRunServiceClearDagRuns = <TData = 
Common.DagRunServiceClearDagRunsMutationResult, TError = unknown, TContext = 
unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+  dagId: string;
+  requestBody: BulkDAGRunClearBody;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+  dagId: string;
+  requestBody: BulkDAGRunClearBody;
+}, TContext>({ mutationFn: ({ dagId, requestBody }) => 
DagRunService.clearDagRuns({ dagId, requestBody }) as unknown as 
Promise<TData>, ...options });
+/**
 * Favorite Dag
 * Mark the Dag as favorite.
 * @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index d3ab8bafd6c..53482dbff5c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -971,6 +971,64 @@ export const $BulkDAGRunBody = {
     description: 'Request body for bulk delete operations on Dag Runs.'
 } as const;
 
+export const $BulkDAGRunClearBody = {
+    properties: {
+        dry_run: {
+            type: 'boolean',
+            title: 'Dry Run',
+            default: true
+        },
+        only_failed: {
+            type: 'boolean',
+            title: 'Only Failed',
+            default: false
+        },
+        only_new: {
+            type: 'boolean',
+            title: 'Only New',
+            description: 'Only queue newly added tasks in the latest Dag 
version without clearing existing tasks.',
+            default: false
+        },
+        run_on_latest_version: {
+            anyOf: [
+                {
+                    type: 'boolean'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Run On Latest Version',
+            description: '(Experimental) Run on the latest bundle version of 
the Dag after clearing. If not specified, falls back to the DAG-level 
``rerun_with_latest_version`` parameter, then the ``[core] 
rerun_with_latest_version`` config option, and finally ``False``.'
+        },
+        note: {
+            anyOf: [
+                {
+                    type: 'string',
+                    maxLength: 1000
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Note'
+        },
+        dag_runs: {
+            items: {
+                '$ref': '#/components/schemas/BulkDAGRunBody'
+            },
+            type: 'array',
+            minItems: 1,
+            title: 'Dag Runs'
+        }
+    },
+    additionalProperties: false,
+    type: 'object',
+    required: ['dag_runs'],
+    title: 'BulkDAGRunClearBody',
+    description: 'Request body for the bulk clear Dag Runs endpoint.'
+} as const;
+
 export const $BulkDeleteAction_BulkDAGRunBody_ = {
     properties: {
         action: {
@@ -2774,7 +2832,7 @@ export const $DAGRunClearBody = {
                 }
             ],
             title: 'Run On Latest Version',
-            description: '(Experimental) Run on the latest bundle version of 
the Dag after clearing the Dag Run. If not specified, falls back to the 
DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] 
rerun_with_latest_version`` config option, and finally ``False`` (the 
historical default for clear/rerun).'
+            description: '(Experimental) Run on the latest bundle version of 
the Dag after clearing. If not specified, falls back to the DAG-level 
``rerun_with_latest_version`` parameter, then the ``[core] 
rerun_with_latest_version`` config option, and finally ``False``.'
         },
         note: {
             anyOf: [
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index 74502a154c8..04b43773dbe 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -3,7 +3,7 @@
 import type { CancelablePromise } from './core/CancelablePromise';
 import { OpenAPI } from './core/OpenAPI';
 import { request as __request } from './core/request';
-import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, 
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, 
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, 
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, 
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, 
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, 
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, 
Dele [...]
+import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, 
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, 
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, 
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, 
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, 
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, 
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, 
Dele [...]
 
 export class AssetService {
     /**
@@ -1233,6 +1233,34 @@ export class DagRunService {
         });
     }
     
+    /**
+     * Clear Dag Runs
+     * Clear multiple Dag Runs in a single request.
+     * @param data The data for the request.
+     * @param data.dagId
+     * @param data.requestBody
+     * @returns unknown Successful Response
+     * @throws ApiError
+     */
+    public static clearDagRuns(data: ClearDagRunsData): 
CancelablePromise<ClearDagRunsResponse> {
+        return __request(OpenAPI, {
+            method: 'POST',
+            url: '/api/v2/dags/{dag_id}/clearDagRuns',
+            path: {
+                dag_id: data.dagId
+            },
+            body: data.requestBody,
+            mediaType: 'application/json',
+            errors: {
+                400: 'Bad Request',
+                401: 'Unauthorized',
+                403: 'Forbidden',
+                404: 'Not Found',
+                422: 'Validation Error'
+            }
+        });
+    }
+    
     /**
      * Get Dag Run Stats
      * Get duration statistics for a DAG based on its historical completed 
runs.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 2000ec81e8f..91c11c794e0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -301,6 +301,24 @@ export type BulkDAGRunBody = {
     dag_id?: string | null;
 };
 
+/**
+ * Request body for the bulk clear Dag Runs endpoint.
+ */
+export type BulkDAGRunClearBody = {
+    dry_run?: boolean;
+    only_failed?: boolean;
+    /**
+     * Only queue newly added tasks in the latest Dag version without clearing 
existing tasks.
+     */
+    only_new?: boolean;
+    /**
+     * (Experimental) Run on the latest bundle version of the Dag after 
clearing. If not specified, falls back to the DAG-level 
``rerun_with_latest_version`` parameter, then the ``[core] 
rerun_with_latest_version`` config option, and finally ``False``.
+     */
+    run_on_latest_version?: boolean | null;
+    note?: string | null;
+    dag_runs: Array<BulkDAGRunBody>;
+};
+
 export type BulkDeleteAction_BulkDAGRunBody_ = {
     /**
      * The action to be performed on the entities.
@@ -748,7 +766,7 @@ export type DAGRunClearBody = {
      */
     only_new?: boolean;
     /**
-     * (Experimental) Run on the latest bundle version of the Dag after 
clearing the Dag Run. If not specified, falls back to the DAG-level 
``rerun_with_latest_version`` parameter, then the ``[core] 
rerun_with_latest_version`` config option, and finally ``False`` (the 
historical default for clear/rerun).
+     * (Experimental) Run on the latest bundle version of the Dag after 
clearing. If not specified, falls back to the DAG-level 
``rerun_with_latest_version`` parameter, then the ``[core] 
rerun_with_latest_version`` config option, and finally ``False``.
      */
     run_on_latest_version?: boolean | null;
     note?: string | null;
@@ -2956,6 +2974,13 @@ export type GetListDagRunsBatchData = {
 
 export type GetListDagRunsBatchResponse = DAGRunCollectionResponse;
 
+export type ClearDagRunsData = {
+    dagId: string;
+    requestBody: BulkDAGRunClearBody;
+};
+
+export type ClearDagRunsResponse = ClearTaskInstanceCollectionResponse | 
DAGRunCollectionResponse;
+
 export type GetDagRunStatsData = {
     dagId: string;
     dagRunId: string;
@@ -5478,6 +5503,37 @@ export type $OpenApiTs = {
             };
         };
     };
+    '/api/v2/dags/{dag_id}/clearDagRuns': {
+        post: {
+            req: ClearDagRunsData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                200: ClearTaskInstanceCollectionResponse | 
DAGRunCollectionResponse;
+                /**
+                 * Bad Request
+                 */
+                400: HTTPExceptionResponse;
+                /**
+                 * Unauthorized
+                 */
+                401: HTTPExceptionResponse;
+                /**
+                 * Forbidden
+                 */
+                403: HTTPExceptionResponse;
+                /**
+                 * Not Found
+                 */
+                404: HTTPExceptionResponse;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
+    };
     '/ui/dags/{dag_id}/dagRuns/{dag_run_id}/stats': {
         get: {
             req: GetDagRunStatsData;
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 318544014e3..f570e5ca3a7 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -1882,6 +1882,218 @@ class TestClearDagRun:
         assert response.status_code == 422
 
 
+class TestBulkClearDagRuns:
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_bulk_clear_specific_dag(self, test_client, session):
+        """Specific dag_id in URL, dag_run_id in body — clears both runs and 
queues them."""
+        response = test_client.post(
+            f"/dags/{DAG1_ID}/clearDagRuns",
+            json={
+                "dry_run": False,
+                "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}, {"dag_run_id": 
DAG1_RUN2_ID}],
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["total_entries"] == 2
+        returned_run_ids = sorted(run["dag_run_id"] for run in 
body["dag_runs"])
+        assert returned_run_ids == sorted([DAG1_RUN1_ID, DAG1_RUN2_ID])
+        for run in body["dag_runs"]:
+            assert run["state"] == "queued"
+            assert run["dag_id"] == DAG1_ID
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_bulk_clear_wildcard_across_dags(self, test_client, session):
+        """``~`` URL with per-entity dag_id — clears runs across Dags in one 
call."""
+        response = test_client.post(
+            "/dags/~/clearDagRuns",
+            json={
+                "dry_run": False,
+                "dag_runs": [
+                    {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID},
+                    {"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID},
+                ],
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["total_entries"] == 2
+        pairs = sorted((run["dag_id"], run["dag_run_id"]) for run in 
body["dag_runs"])
+        assert pairs == sorted([(DAG1_ID, DAG1_RUN1_ID), (DAG2_ID, 
DAG2_RUN1_ID)])
+        for run in body["dag_runs"]:
+            assert run["state"] == "queued"
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_bulk_clear_dry_run_collects_affected_tis_across_runs(self, 
test_client, session):
+        """Dry-run returns the union of affected TIs across the listed runs 
without mutating state."""
+        response = test_client.post(
+            f"/dags/{DAG1_ID}/clearDagRuns",
+            json={
+                "dry_run": True,
+                "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}, {"dag_run_id": 
DAG1_RUN2_ID}],
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        # Both DAG1 runs have two task instances each.
+        assert body["total_entries"] == 4
+        run_ids_in_response = {ti["dag_run_id"] for ti in 
body["task_instances"]}
+        assert run_ids_in_response == {DAG1_RUN1_ID, DAG1_RUN2_ID}
+        # No state changes — dry_run never writes.
+        dag_run = session.scalar(
+            select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id == 
DAG1_RUN1_ID)
+        )
+        assert dag_run.state == DAG1_RUN1_STATE
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_bulk_clear_dry_run_only_failed_filters(self, test_client):
+        """``only_failed=True`` shrinks the dry-run preview to failed TIs 
only."""
+        response = test_client.post(
+            f"/dags/{DAG1_ID}/clearDagRuns",
+            json={
+                "dry_run": True,
+                "only_failed": True,
+                "dag_runs": [{"dag_run_id": DAG1_RUN2_ID}],
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert all(ti["state"] == "failed" for ti in body["task_instances"])
+        assert body["total_entries"] == 1
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_bulk_clear_applies_note_to_each_run(self, test_client, session):
+        """``note`` in the body is applied to every cleared run in the same 
transaction."""
+        response = test_client.post(
+            f"/dags/{DAG1_ID}/clearDagRuns",
+            json={
+                "dry_run": False,
+                "note": "bulk cleared by test",
+                "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}, {"dag_run_id": 
DAG1_RUN2_ID}],
+            },
+        )
+        assert response.status_code == 200
+        for run_id in (DAG1_RUN1_ID, DAG1_RUN2_ID):
+            dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == 
DAG1_ID, DagRun.run_id == run_id))
+            assert dag_run.note == "bulk cleared by test"
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_bulk_clear_wildcard_rejects_missing_dag_id(self, test_client):
+        """``~`` URL requires every entry to carry a concrete dag_id; 400 
otherwise."""
+        response = test_client.post(
+            "/dags/~/clearDagRuns",
+            json={
+                "dry_run": False,
+                "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}],
+            },
+        )
+        assert response.status_code == 400
+        assert DAG1_RUN1_ID in response.json()["detail"]
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_bulk_clear_specific_url_rejects_mismatched_dag_id(self, 
test_client):
+        """When the URL has a specific dag_id, mismatched per-entity dag_id is 
rejected."""
+        response = test_client.post(
+            f"/dags/{DAG1_ID}/clearDagRuns",
+            json={
+                "dry_run": False,
+                "dag_runs": [{"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID}],
+            },
+        )
+        assert response.status_code == 400
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_bulk_clear_missing_run_returns_404(self, test_client):
+        response = test_client.post(
+            f"/dags/{DAG1_ID}/clearDagRuns",
+            json={
+                "dry_run": False,
+                "dag_runs": [{"dag_run_id": "does_not_exist"}],
+            },
+        )
+        assert response.status_code == 404
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_bulk_clear_rejects_only_new_with_only_failed(self, test_client):
+        """``only_new`` and ``only_failed`` are mutually exclusive at the body 
validator level."""
+        response = test_client.post(
+            f"/dags/{DAG1_ID}/clearDagRuns",
+            json={
+                "dry_run": True,
+                "only_new": True,
+                "only_failed": True,
+                "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}],
+            },
+        )
+        assert response.status_code == 422
+
+    def test_bulk_clear_unauthenticated_returns_401(self, 
unauthenticated_test_client):
+        response = unauthenticated_test_client.post(
+            f"/dags/{DAG1_ID}/clearDagRuns",
+            json={"dry_run": False, "dag_runs": [{"dag_run_id": 
DAG1_RUN1_ID}]},
+        )
+        assert response.status_code == 401
+
+    def test_bulk_clear_unauthorized_returns_403(self, 
unauthorized_test_client):
+        response = unauthorized_test_client.post(
+            f"/dags/{DAG1_ID}/clearDagRuns",
+            json={"dry_run": False, "dag_runs": [{"dag_run_id": 
DAG1_RUN1_ID}]},
+        )
+        assert response.status_code == 403
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_bulk_clear_rejects_unauthorized_dag_ids_from_request_body(self, 
test_client, session):
+        """A 403 at the route level if any entry references a Dag the user 
can't access; nothing is cleared."""
+        restricted_bundle_name = "restricted-bundle-clear"
+        restricted_team_name = "restricted-team-clear"
+        restricted_bundle = DagBundleModel(name=restricted_bundle_name)
+        restricted_team = Team(name=restricted_team_name)
+        restricted_bundle.teams.append(restricted_team)
+        session.add_all([restricted_bundle, restricted_team])
+        session.flush()
+        # Restrict DAG2 by attaching it to a team-scoped bundle the limited 
user has no access to.
+        session.execute(
+            update(DagModel).where(DagModel.dag_id == 
DAG2_ID).values(bundle_name=restricted_bundle_name)
+        )
+        session.commit()
+
+        states_before = {
+            run_id: session.scalar(select(DagRun.state).where(DagRun.run_id == 
run_id))
+            for run_id in (DAG1_RUN1_ID, DAG2_RUN1_ID)
+        }
+
+        auth_manager = test_client.app.state.auth_manager
+        token = auth_manager._get_token_signer().generate(
+            auth_manager.serialize_user(
+                SimpleAuthManagerUser(username="limited-user", role="user", 
teams=[]),
+            )
+        )
+        with (
+            mock.patch("airflow.models.revoked_token.RevokedToken.is_revoked", 
return_value=False),
+            TestClient(
+                test_client.app,
+                headers={"Authorization": f"Bearer {token}"},
+                base_url=str(test_client.base_url),
+            ) as limited_test_client,
+        ):
+            response = limited_test_client.post(
+                "/dags/~/clearDagRuns",
+                json={
+                    "dry_run": False,
+                    "dag_runs": [
+                        {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID},
+                        {"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID},
+                    ],
+                },
+            )
+
+        assert response.status_code == 403
+        # The batched auth check rejects the whole request, so the authorized 
Dag's run is not cleared either.
+        session.expire_all()
+        for run_id, state_before in states_before.items():
+            assert session.scalar(select(DagRun.state).where(DagRun.run_id == 
run_id)) == state_before
+
+
 class TestClearDagRunOnlyNew:
     """Integration tests for only_new=True using a real two-version DAG.
 
@@ -2568,12 +2780,12 @@ class TestResolveRunOnLatestVersion:
     def test_clear_endpoint_invokes_resolver_when_field_omitted(self, 
test_client):
         """Clearing without run_on_latest_version triggers the server-side 
resolver."""
         with mock.patch(
-            
"airflow.api_fastapi.core_api.routes.public.dag_run.resolve_run_on_latest_version",
+            
"airflow.api_fastapi.core_api.services.public.dag_run.resolve_run_on_latest_version",
             return_value=False,
         ) as mock_resolver:
             response = test_client.post(
                 f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
-                json={"dry_run": True},
+                json={"dry_run": False},
             )
         assert response.status_code == 200
         mock_resolver.assert_called_once()
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index c019b4381f3..7b9e95e5528 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -120,6 +120,38 @@ class BulkDAGRunBody(BaseModel):
     dag_id: Annotated[str | None, Field(title="Dag Id")] = None
 
 
+class Note(RootModel[str]):
+    root: Annotated[str, Field(max_length=1000, title="Note")]
+
+
+class BulkDAGRunClearBody(BaseModel):
+    """
+    Request body for the bulk clear Dag Runs endpoint.
+    """
+
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    dry_run: Annotated[bool | None, Field(title="Dry Run")] = True
+    only_failed: Annotated[bool | None, Field(title="Only Failed")] = False
+    only_new: Annotated[
+        bool | None,
+        Field(
+            description="Only queue newly added tasks in the latest Dag 
version without clearing existing tasks.",
+            title="Only New",
+        ),
+    ] = False
+    run_on_latest_version: Annotated[
+        bool | None,
+        Field(
+            description="(Experimental) Run on the latest bundle version of 
the Dag after clearing. If not specified, falls back to the DAG-level 
``rerun_with_latest_version`` parameter, then the ``[core] 
rerun_with_latest_version`` config option, and finally ``False``.",
+            title="Run On Latest Version",
+        ),
+    ] = None
+    note: Annotated[Note | None, Field(title="Note")] = None
+    dag_runs: Annotated[list[BulkDAGRunBody], Field(min_length=1, title="Dag 
Runs")]
+
+
 class BulkDeleteActionBulkDAGRunBody(BaseModel):
     model_config = ConfigDict(
         extra="forbid",
@@ -157,10 +189,6 @@ class BulkResponse(BaseModel):
     ] = None
 
 
-class Note(RootModel[str]):
-    root: Annotated[str, Field(max_length=1000, title="Note")]
-
-
 class BulkUpdateActionBulkDAGRunBody(BaseModel):
     model_config = ConfigDict(
         extra="forbid",
@@ -344,7 +372,7 @@ class DAGRunClearBody(BaseModel):
     run_on_latest_version: Annotated[
         bool | None,
         Field(
-            description="(Experimental) Run on the latest bundle version of 
the Dag after clearing the Dag Run. If not specified, falls back to the 
DAG-level ``rerun_with_latest_version`` parameter, then the ``[core] 
rerun_with_latest_version`` config option, and finally ``False`` (the 
historical default for clear/rerun).",
+            description="(Experimental) Run on the latest bundle version of 
the Dag after clearing. If not specified, falls back to the DAG-level 
``rerun_with_latest_version`` parameter, then the ``[core] 
rerun_with_latest_version`` config option, and finally ``False``.",
             title="Run On Latest Version",
         ),
     ] = None

Reply via email to