This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9288949db28 Add bulk delete Dag Runs (#67095)
9288949db28 is described below

commit 9288949db28e811bb0fbeb88ebd8d509fb159e61
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Thu May 21 22:14:30 2026 +0200

    Add bulk delete Dag Runs (#67095)
    
    * Add bulk delete endpoint for Dag Runs
    
    Restores feature parity with Airflow 2.x where DagRunModelView exposed
    collective Delete on the Dag Runs list view. Adds:
    
    - PATCH /dags/{dag_id}/dagRuns — bulk endpoint structured like the
      existing bulk task-instances endpoint. Only ``delete`` is supported in
      this PR; ``create`` and ``update`` are wired to return 405 in the
      BulkResponse so future PRs can fill them in without changing the
      route surface.
    - BulkDagRunService with deletable-state enforcement (matches the
      single-run delete: only QUEUED / SUCCESS / FAILED can be deleted),
      per-Dag authorization caching for the wildcard path
      /dags/~/dagRuns, and ``action_on_non_existence: fail | skip``
      semantics.
    - Row selection + a Delete bulk action on the runs list page,
      mirroring how Task Instances does it.
    
    Bulk Mark-as and Bulk Clear are intentionally out of scope and will
    follow in separate PRs.
    
    The grid view stays single-select; multi-select on the grid was not
    available in 2.x either, and the runs list page is the natural target
    for bulk operations on a filtered set (e.g. state=failed).
    
    closes: #52439
    
    * Address self-review on bulk delete Dag Runs
    
    - Switch BulkDagRunService._fetch_dag_runs to tuple_(dag_id, run_id).in_()
      to avoid a Cartesian over-fetch when /dags/~/dagRuns is called with
      pairs spanning multiple Dags. Matches BulkTaskInstanceService.
    - Narrow _check_dag_authorization's method type to Literal["DELETE"];
      this PR only does delete, no point in exposing PUT/POST/GET on the
      signature.
    - Add a wildcard test that exercises the per-Dag authorization path
      (limited user accepted for one Dag, rejected with 403 in the
      BulkResponse for a team-restricted Dag).
    
    * Apply Brent's review feedback from the closed twin PR
    
    Pulls in the substantive UI feedback @bbovenzi left on #66554 so this
    PR lands without re-litigating the same comments. The closed PR was
    broader (clear / mark / delete) but the structural feedback applies
    verbatim to delete-only:
    
    - Move all DagRuns-related files into ``pages/DagRuns/``, mirroring
      ``pages/TaskInstances/``. Adds a small ``index.ts`` re-export so
      ``router.tsx`` keeps the same import path.
    - Rename ``useBulkDagRuns`` to ``useBulkDeleteDagRuns`` so the hook
      name matches the single button it serves (one-hook-one-button
      symmetry — when we add bulk update/clear we'll add sibling hooks).
    - Stop hand-rolling pending/error state with ``useState<unknown>``;
      read ``error`` straight off ``useMutation``'s return.
    - Surface ALL per-entity errors from ``BulkResponse.delete.errors``
      instead of just the first; render each as its own ``Alert`` row.
    - Only invalidate the dag-runs / task-instances queries when at least
      one entry actually succeeded — a 200 with all-errors should not
      churn the table.
    - Keep the dialog open when per-entity errors come back so the user
      can read what failed; ``reset()`` clears them on close.
    
    * Address inline review: mirror task-instance bulk delete patterns
    
    Backend:
    - Inline ``_resolve_dag_id``, ``_result_key`` and ``_fetch_dag_runs`` —
      each had exactly one caller. Memory rule added so future PRs avoid
      premature helpers.
    - Drop the deletable-state restriction. Bulk task-instance delete has no
      such restriction; bulk Dag-run delete shouldn't either.
    - Emit one 404 error per missing entity when ``action_on_non_existence``
      is ``fail`` instead of collating them into a single error, and stop
      early-returning so matched runs still get deleted. The invariant
      ``len(success) + len(errors) == len(requested entities)`` now holds.
    - Distinguish the two ways a wildcard can leak into ``dag_id`` (path
      vs body) in the 400 message, mirroring
      ``BulkTaskInstanceService._categorize_entities``.
    
    UI:
    - Mirror ``useBulkTaskInstances`` exactly: bring back ``useState`` for
      the error, the shared ``handleActionResult`` helper, single-error
      surfacing via ``ErrorAlert``, and the ``bulkAction(requestBody)``
      shape so the consumer constructs the full ``BulkBody``. Brent's prior
      review of the closed twin PR pushed past this pattern, but until TI
      is updated we want both hooks symmetrical — a follow-up can improve
      both at once.
    - Inline the affected-runs column array into ``BulkDeleteDagRunsButton``
      and delete the standalone ``bulkDagRunsColumns.tsx`` file (single
      caller).
    
    * Move bulk Dag Run authorization to a dedicated route dependency
    
    Adds ``requires_access_dag_run_bulk`` in ``core_api/security.py``
    following the ``requires_access_connection_bulk`` pattern. The
    dependency reads the parsed ``BulkBody[BulkDAGRunBody]``, resolves
    each entity's ``dag_id`` (body wins, falling back to the path),
    collects team mappings per Dag, and uses ``batch_is_authorized_dag``
    to enforce auth before the route handler runs.
    
    The route now declares only this dependency plus ``action_logging``;
    the per-entity auth check is no longer duplicated inside
    ``BulkDagRunService``. Unauthorized requests fail with a single
    route-level 403 instead of returning 200 with per-entity 403s in the
    ``BulkResponse``, matching how connections / pools / variables behave.
    
    * Mirror single-run DELETE state restriction on bulk delete
    
    Single ``DELETE /dags/{dag_id}/dagRuns/{dag_run_id}`` rejects RUNNING
    runs with 409; bulk delete now does the same — a RUNNING entity yields
    a per-entity 409 in ``BulkResponse.errors`` and the matched non-running
    entities still get deleted.
    
    Also renames ``DAGRunPatchStates`` to ``DagRunMutableStates`` since it
    now gates both PATCH (mark-as) and DELETE — same set of states (QUEUED,
    SUCCESS, FAILED), broader meaning. Propagated through the route handlers,
    the bulk service, and the UI components that import the generated type.
    
    * Invalidate dependent queries after bulk delete Dag Run
    
    Bulk delete only invalidated the top-level dag-runs and task-instances
    lists. Two more cache sets stay stale otherwise:
    
    - Per-attempt TI caches (log / extra links / try details), keyed by
      TI identity. When the user navigates back to a TI try detail via
      the browser back button after deleting its run, react-query serves
      the cached log instead of letting the request hit and return 404.
    
    - The grid view query set for each affected Dag — the grid renders
      one bar per Dag Run, so bulk delete literally removes bars.
    
    The per-attempt set mirrors the addition ``useBulkTaskInstances``
    already gained in #67212. The grid invalidation is specific to this
    hook because deleting Dag Runs (unlike deleting TIs) changes what the
    grid bars themselves represent.
    
    The affected ``dag_id`` set is captured in ``bulkAction`` from the
    request body and read in ``onSuccess``, same lifecycle as
    ``useBulkClearTaskInstances``'s ``byDagRun`` grouping.
    
    * Restore resolve_run_on_latest_version import dropped during rebase
    
    Rebasing the branch onto the latest ``apache/main`` with the
    ``-X theirs`` strategy used the incoming side for every conflict in
    ``routes/public/dag_run.py``. That dropped the
    ``resolve_run_on_latest_version`` import that #63884 added — the
    function call at line 336 was preserved but the import line wasn't,
    so the module failed to load and every test in ``test_dag_run.py``
    crashed at collection (Static checks, mypy, and all DB-core test
    matrices on CI).
    
    This commit only adds the missing import back; no other change.
---
 .../api_fastapi/core_api/datamodels/dag_run.py     |  13 +-
 .../core_api/openapi/v2-rest-api-generated.yaml    | 328 +++++++++++++++------
 .../api_fastapi/core_api/routes/public/dag_run.py  |  28 +-
 .../src/airflow/api_fastapi/core_api/security.py   |  52 ++++
 .../core_api/services/public/dag_run.py            | 115 +++++++-
 .../src/airflow/ui/openapi-gen/queries/common.ts   |  17 +-
 .../ui/openapi-gen/queries/ensureQueryData.ts      |  26 +-
 .../src/airflow/ui/openapi-gen/queries/prefetch.ts |  26 +-
 .../src/airflow/ui/openapi-gen/queries/queries.ts  |  76 +++--
 .../src/airflow/ui/openapi-gen/queries/suspense.ts |  26 +-
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts | 170 ++++++++++-
 .../ui/openapi-gen/requests/services.gen.ts        |  98 +++---
 .../airflow/ui/openapi-gen/requests/types.gen.ts   | 154 +++++++---
 .../src/components/MarkAs/Run/MarkRunAsButton.tsx  |   4 +-
 .../src/components/MarkAs/Run/MarkRunAsDialog.tsx  |   4 +-
 .../src/airflow/ui/src/components/MarkAs/utils.ts  |   4 +-
 .../src/pages/DagRuns/BulkDeleteDagRunsButton.tsx  | 176 +++++++++++
 .../ui/src/pages/{ => DagRuns}/DagRuns.test.tsx    |   0
 .../airflow/ui/src/pages/{ => DagRuns}/DagRuns.tsx |  79 ++++-
 .../ui/src/pages/{ => DagRuns}/DagRunsFilters.tsx  |   0
 .../ui/src/pages/{ => DagRuns}/DeleteRunButton.tsx |   0
 .../MarkAs/utils.ts => pages/DagRuns/index.ts}     |   4 +-
 .../src/airflow/ui/src/pages/Run/Header.tsx        |   2 +-
 .../airflow/ui/src/queries/useBulkDeleteDagRuns.ts | 114 +++++++
 .../core_api/routes/public/test_dag_run.py         | 274 ++++++++++++++++-
 .../src/airflowctl/api/datamodels/generated.py     |  93 +++++-
 26 files changed, 1602 insertions(+), 281 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index b1e2500203e..97b37fdaae5 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -35,8 +35,8 @@ if TYPE_CHECKING:
     from airflow.serialization.definitions.dag import SerializedDAG
 
 
-class DAGRunPatchStates(str, Enum):
-    """Enum for Dag Run states when updating a Dag Run."""
+class DagRunMutableStates(str, Enum):
+    """Dag Run states from which the run may be mutated (patched, deleted)."""
 
     QUEUED = DagRunState.QUEUED
     SUCCESS = DagRunState.SUCCESS
@@ -46,10 +46,17 @@ class DAGRunPatchStates(str, Enum):
 class DAGRunPatchBody(StrictBaseModel):
     """Dag Run Serializer for PATCH requests."""
 
-    state: DAGRunPatchStates | None = None
+    state: DagRunMutableStates | None = None
     note: str | None = Field(None, max_length=1000)
 
 
+class BulkDAGRunBody(StrictBaseModel):
+    """Request body for bulk delete operations on Dag Runs."""
+
+    dag_run_id: str
+    dag_id: str | None = None
+
+
 class DAGRunClearBody(StrictBaseModel):
     """Dag Run serializer for clear endpoint body."""
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 00a9c60d858..23f2622b13b 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -2052,67 +2052,13 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
-  /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents:
-    get:
-      tags:
-      - DagRun
-      summary: Get Upstream Asset Events
-      description: If dag run is asset-triggered, return the asset events that 
triggered
-        it.
-      operationId: get_upstream_asset_events
-      security:
-      - OAuth2PasswordBearer: []
-      - HTTPBearer: []
-      parameters:
-      - name: dag_id
-        in: path
-        required: true
-        schema:
-          type: string
-          title: Dag Id
-      - name: dag_run_id
-        in: path
-        required: true
-        schema:
-          type: string
-          title: Dag Run Id
-      responses:
-        '200':
-          description: Successful Response
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/AssetEventCollectionResponse'
-        '401':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Unauthorized
-        '403':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Forbidden
-        '404':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Not Found
-        '422':
-          description: Validation Error
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPValidationError'
-  /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear:
-    post:
+  /api/v2/dags/{dag_id}/dagRuns:
+    patch:
       tags:
       - DagRun
-      summary: Clear Dag Run
-      operationId: clear_dag_run
+      summary: Bulk Dag Runs
+      description: Bulk delete Dag Runs.
+      operationId: bulk_dag_runs
       security:
       - OAuth2PasswordBearer: []
       - HTTPBearer: []
@@ -2123,28 +2069,19 @@ paths:
         schema:
           type: string
           title: Dag Id
-      - name: dag_run_id
-        in: path
-        required: true
-        schema:
-          type: string
-          title: Dag Run Id
       requestBody:
         required: true
         content:
           application/json:
             schema:
-              $ref: '#/components/schemas/DAGRunClearBody'
+              $ref: '#/components/schemas/BulkBody_BulkDAGRunBody_'
       responses:
         '200':
           description: Successful Response
           content:
             application/json:
               schema:
-                anyOf:
-                - $ref: 
'#/components/schemas/ClearTaskInstanceCollectionResponse'
-                - $ref: '#/components/schemas/DAGRunResponse'
-                title: Response Clear Dag Run
+                $ref: '#/components/schemas/BulkResponse'
         '401':
           content:
             application/json:
@@ -2157,19 +2094,12 @@ paths:
               schema:
                 $ref: '#/components/schemas/HTTPExceptionResponse'
           description: Forbidden
-        '404':
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/HTTPExceptionResponse'
-          description: Not Found
         '422':
           description: Validation Error
           content:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
-  /api/v2/dags/{dag_id}/dagRuns:
     get:
       tags:
       - DagRun
@@ -2792,6 +2722,123 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents:
+    get:
+      tags:
+      - DagRun
+      summary: Get Upstream Asset Events
+      description: If dag run is asset-triggered, return the asset events that 
triggered
+        it.
+      operationId: get_upstream_asset_events
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: dag_run_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Run Id
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/AssetEventCollectionResponse'
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
+  /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear:
+    post:
+      tags:
+      - DagRun
+      summary: Clear Dag Run
+      operationId: clear_dag_run
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: dag_run_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Run Id
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/DAGRunClearBody'
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                anyOf:
+                - $ref: 
'#/components/schemas/ClearTaskInstanceCollectionResponse'
+                - $ref: '#/components/schemas/DAGRunResponse'
+                title: Response Clear Dag Run
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait:
     get:
       tags:
@@ -11560,6 +11607,21 @@ components:
 
         This structure helps users understand which key actions succeeded and 
which
         failed.'
+    BulkBody_BulkDAGRunBody_:
+      properties:
+        actions:
+          items:
+            oneOf:
+            - $ref: '#/components/schemas/BulkCreateAction_BulkDAGRunBody_'
+            - $ref: '#/components/schemas/BulkUpdateAction_BulkDAGRunBody_'
+            - $ref: '#/components/schemas/BulkDeleteAction_BulkDAGRunBody_'
+          type: array
+          title: Actions
+      additionalProperties: false
+      type: object
+      required:
+      - actions
+      title: BulkBody[BulkDAGRunBody]
     BulkBody_BulkTaskInstanceBody_:
       properties:
         actions:
@@ -11620,6 +11682,28 @@ components:
       required:
       - actions
       title: BulkBody[VariableBody]
+    BulkCreateAction_BulkDAGRunBody_:
+      properties:
+        action:
+          type: string
+          const: create
+          title: Action
+          description: The action to be performed on the entities.
+        entities:
+          items:
+            $ref: '#/components/schemas/BulkDAGRunBody'
+          type: array
+          title: Entities
+          description: A list of entities to be created.
+        action_on_existence:
+          $ref: '#/components/schemas/BulkActionOnExistence'
+          default: fail
+      additionalProperties: false
+      type: object
+      required:
+      - action
+      - entities
+      title: BulkCreateAction[BulkDAGRunBody]
     BulkCreateAction_BulkTaskInstanceBody_:
       properties:
         action:
@@ -11708,6 +11792,46 @@ components:
       - action
       - entities
       title: BulkCreateAction[VariableBody]
+    BulkDAGRunBody:
+      properties:
+        dag_run_id:
+          type: string
+          title: Dag Run Id
+        dag_id:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Dag Id
+      additionalProperties: false
+      type: object
+      required:
+      - dag_run_id
+      title: BulkDAGRunBody
+      description: Request body for bulk delete operations on Dag Runs.
+    BulkDeleteAction_BulkDAGRunBody_:
+      properties:
+        action:
+          type: string
+          const: delete
+          title: Action
+          description: The action to be performed on the entities.
+        entities:
+          items:
+            anyOf:
+            - type: string
+            - $ref: '#/components/schemas/BulkDAGRunBody'
+          type: array
+          title: Entities
+          description: A list of entity id/key or entity objects to be deleted.
+        action_on_non_existence:
+          $ref: '#/components/schemas/BulkActionNotOnExistence'
+          default: fail
+      additionalProperties: false
+      type: object
+      required:
+      - action
+      - entities
+      title: BulkDeleteAction[BulkDAGRunBody]
     BulkDeleteAction_BulkTaskInstanceBody_:
       properties:
         action:
@@ -11889,6 +12013,38 @@ components:
       - task_id
       title: BulkTaskInstanceBody
       description: Request body for bulk update, and delete task instances.
+    BulkUpdateAction_BulkDAGRunBody_:
+      properties:
+        action:
+          type: string
+          const: update
+          title: Action
+          description: The action to be performed on the entities.
+        entities:
+          items:
+            $ref: '#/components/schemas/BulkDAGRunBody'
+          type: array
+          title: Entities
+          description: A list of entities to be updated.
+        update_mask:
+          anyOf:
+          - items:
+              type: string
+            type: array
+          - type: 'null'
+          title: Update Mask
+          description: A list of field names to update for each entity.Only 
these
+            fields will be applied from the request body to the database 
model.Any
+            extra fields provided will be ignored.
+        action_on_non_existence:
+          $ref: '#/components/schemas/BulkActionNotOnExistence'
+          default: fail
+      additionalProperties: false
+      type: object
+      required:
+      - action
+      - entities
+      title: BulkUpdateAction[BulkDAGRunBody]
     BulkUpdateAction_BulkTaskInstanceBody_:
       properties:
         action:
@@ -12920,7 +13076,7 @@ components:
       properties:
         state:
           anyOf:
-          - $ref: '#/components/schemas/DAGRunPatchStates'
+          - $ref: '#/components/schemas/DagRunMutableStates'
           - type: 'null'
         note:
           anyOf:
@@ -12932,14 +13088,6 @@ components:
       type: object
       title: DAGRunPatchBody
       description: Dag Run Serializer for PATCH requests.
-    DAGRunPatchStates:
-      type: string
-      enum:
-      - queued
-      - success
-      - failed
-      title: DAGRunPatchStates
-      description: Enum for Dag Run states when updating a Dag Run.
     DAGRunResponse:
       properties:
         dag_run_id:
@@ -13400,6 +13548,14 @@ components:
       - partition_key
       title: DagRunAssetReference
       description: DagRun serializer for asset responses.
+    DagRunMutableStates:
+      type: string
+      enum:
+      - queued
+      - success
+      - failed
+      title: DagRunMutableStates
+      description: Dag Run states from which the run may be mutated (patched, 
deleted).
     DagRunState:
       type: string
       enum:
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 4577a516089..a5c76550be5 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -76,11 +76,13 @@ from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.common.types import Mimetype
 from airflow.api_fastapi.core_api.base import OrmClause
 from airflow.api_fastapi.core_api.datamodels.assets import 
AssetEventCollectionResponse
+from airflow.api_fastapi.core_api.datamodels.common import BulkBody, 
BulkResponse
 from airflow.api_fastapi.core_api.datamodels.dag_run import (
+    BulkDAGRunBody,
     DAGRunClearBody,
     DAGRunCollectionResponse,
+    DagRunMutableStates,
     DAGRunPatchBody,
-    DAGRunPatchStates,
     DAGRunResponse,
     DAGRunsBatchBody,
     TriggerDAGRunPostBody,
@@ -96,9 +98,10 @@ from airflow.api_fastapi.core_api.security import (
     ReadableDagRunsFilterDep,
     requires_access_asset,
     requires_access_dag,
+    requires_access_dag_run_bulk,
 )
 from airflow.api_fastapi.core_api.services.public.common import 
resolve_run_on_latest_version
-from airflow.api_fastapi.core_api.services.public.dag_run import DagRunWaiter
+from airflow.api_fastapi.core_api.services.public.dag_run import 
BulkDagRunService, DagRunWaiter
 from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.exceptions import ParamValidationError
 from airflow.listeners.listener import get_listener_manager
@@ -152,7 +155,7 @@ def get_dag_run(dag_id: str, dag_run_id: str, session: 
SessionDep) -> DAGRunResp
 def delete_dag_run(dag_id: str, dag_run_id: str, session: SessionDep):
     """Delete a Dag Run entry."""
     dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, 
run_id=dag_run_id))
-    deletable_states = {s.value for s in DAGRunPatchStates}
+    deletable_states = {s.value for s in DagRunMutableStates}
 
     if dag_run is None:
         raise HTTPException(
@@ -220,7 +223,7 @@ def patch_dag_run(
     for attr_name, attr_value_raw in data.items():
         if attr_name == "state":
             attr_value = getattr(patch_body, "state")
-            if attr_value == DAGRunPatchStates.SUCCESS:
+            if attr_value == DagRunMutableStates.SUCCESS:
                 set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, 
commit=True, session=session)
                 try:
                     
get_listener_manager().hook.on_dag_run_success(dag_run=dag_run, msg="")
@@ -229,10 +232,10 @@ def patch_dag_run(
 
             # TODO AIP-103: https://github.com/apache/airflow/issues/66755
             # Handle clearing states for all task instances in a dagrun when 
cleared
-            elif attr_value == DAGRunPatchStates.QUEUED:
+            elif attr_value == DagRunMutableStates.QUEUED:
                 set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, 
commit=True, session=session)
                 # Not notifying on queued - only notifying on RUNNING, this is 
happening in scheduler
-            elif attr_value == DAGRunPatchStates.FAILED:
+            elif attr_value == DagRunMutableStates.FAILED:
                 set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, 
commit=True, session=session)
                 try:
                     
get_listener_manager().hook.on_dag_run_failed(dag_run=dag_run, msg="")
@@ -253,6 +256,19 @@ def patch_dag_run(
     return final_dag_run
 
 
+@dag_run_router.patch(
+    "",
+    dependencies=[Depends(requires_access_dag_run_bulk()), 
Depends(action_logging())],
+)
+def bulk_dag_runs(
+    request: BulkBody[BulkDAGRunBody],
+    session: SessionDep,
+    dag_id: str,
+) -> BulkResponse:
+    """Bulk delete Dag Runs."""
+    return BulkDagRunService(session=session, request=request, 
dag_id=dag_id).handle_request()
+
+
 @dag_run_router.get(
     "/{dag_run_id}/upstreamAssetEvents",
     responses=create_openapi_http_exception_doc(
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py 
b/airflow-core/src/airflow/api_fastapi/core_api/security.py
index a54425326d3..f139ebc8ce7 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/security.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py
@@ -37,6 +37,7 @@ from airflow.api_fastapi.auth.managers.base_auth_manager 
import (
 from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
 from airflow.api_fastapi.auth.managers.models.batch_apis import (
     IsAuthorizedConnectionRequest,
+    IsAuthorizedDagRequest,
     IsAuthorizedPoolRequest,
     IsAuthorizedVariableRequest,
 )
@@ -62,6 +63,7 @@ from airflow.api_fastapi.core_api.datamodels.common import (
     BulkUpdateAction,
 )
 from airflow.api_fastapi.core_api.datamodels.connections import ConnectionBody
+from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody
 from airflow.api_fastapi.core_api.datamodels.pools import PoolBody
 from airflow.api_fastapi.core_api.datamodels.variables import VariableBody
 from airflow.configuration import conf
@@ -725,6 +727,56 @@ def requires_access_variable_bulk() -> 
Callable[[BulkBody[VariableBody], BaseUse
     return inner
 
 
+def requires_access_dag_run_bulk() -> Callable[[BulkBody[BulkDAGRunBody], 
BaseUser, str], None]:
+    def inner(
+        request: BulkBody[BulkDAGRunBody],
+        user: GetUserDep,
+        dag_id: str,
+    ) -> None:
+        resolved_dag_ids: set[str] = set()
+        for action in request.actions:
+            for entity in action.entities:
+                if isinstance(entity, str):
+                    entity_dag_id: str | None = dag_id
+                else:
+                    entity_dag_id = entity.dag_id or dag_id
+                if entity_dag_id and entity_dag_id != "~":
+                    resolved_dag_ids.add(entity_dag_id)
+
+        dag_id_to_team = {d: DagModel.get_team_name(d) for d in 
resolved_dag_ids}
+
+        requests: list[IsAuthorizedDagRequest] = []
+        for action in request.actions:
+            methods = _get_resource_methods_from_bulk_request(action)
+            for entity in action.entities:
+                if isinstance(entity, str):
+                    entity_dag_id = dag_id
+                else:
+                    entity_dag_id = entity.dag_id or dag_id
+                # Entities that can't be resolved are surfaced as 400 in the 
service's BulkResponse.
+                if not entity_dag_id or entity_dag_id == "~":
+                    continue
+                for method in methods:
+                    requests.append(
+                        {
+                            "method": method,
+                            "access_entity": DagAccessEntity.RUN,
+                            "details": DagDetails(
+                                id=entity_dag_id, 
team_name=dag_id_to_team.get(entity_dag_id)
+                            ),
+                        }
+                    )
+
+        _requires_access(
+            is_authorized_callback=lambda: 
get_auth_manager().batch_is_authorized_dag(
+                requests=requests,
+                user=user,
+            )
+        )
+
+    return inner
+
+
 def requires_access_asset(method: ResourceMethod) -> Callable[[Request, 
BaseUser], None]:
     def inner(
         request: Request,
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
index e7d7cb98c93..4c5a5b090b6 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
@@ -24,8 +24,21 @@ import operator
 from typing import TYPE_CHECKING, Any
 
 import attrs
-from sqlalchemy import select
+import structlog
+from fastapi import status
+from sqlalchemy import select, tuple_
+from sqlalchemy.orm import Session
 
+from airflow.api_fastapi.core_api.datamodels.common import (
+    BulkActionNotOnExistence,
+    BulkActionResponse,
+    BulkBody,
+    BulkCreateAction,
+    BulkDeleteAction,
+    BulkUpdateAction,
+)
+from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody, 
DagRunMutableStates
+from airflow.api_fastapi.core_api.services.public.common import BulkService
 from airflow.models.dagrun import DagRun
 from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
 from airflow.utils.session import create_session_async
@@ -34,6 +47,8 @@ from airflow.utils.state import State
 if TYPE_CHECKING:
     from collections.abc import AsyncGenerator, Iterator
 
+log = structlog.get_logger(__name__)
+
 
 @attrs.define
 class DagRunWaiter:
@@ -86,3 +101,101 @@ class DagRunWaiter:
             await asyncio.sleep(self.interval)
             yield await self._serialize_response(dag_run := await 
self._get_dag_run())
             yield "\n"
+
+
+class BulkDagRunService(BulkService[BulkDAGRunBody]):
+    """Service for handling bulk operations on Dag Runs."""
+
+    def __init__(
+        self,
+        session: Session,
+        request: BulkBody[BulkDAGRunBody],
+        dag_id: str,
+    ):
+        super().__init__(session, request)
+        self.dag_id = dag_id
+
+    def handle_bulk_create(
+        self, action: BulkCreateAction[BulkDAGRunBody], results: 
BulkActionResponse
+    ) -> None:
+        results.errors.append(
+            {
+                "error": "Dag Runs bulk create is not supported. Use the 
trigger Dag Run endpoint instead.",
+                "status_code": status.HTTP_405_METHOD_NOT_ALLOWED,
+            }
+        )
+
+    def handle_bulk_update(
+        self, action: BulkUpdateAction[BulkDAGRunBody], results: 
BulkActionResponse
+    ) -> None:
+        results.errors.append(
+            {
+                "error": "Dag Runs bulk update is not supported yet. Use the 
patch Dag Run endpoint per run instead.",
+                "status_code": status.HTTP_405_METHOD_NOT_ALLOWED,
+            }
+        )
+
+    def handle_bulk_delete(
+        self, action: BulkDeleteAction[BulkDAGRunBody], results: 
BulkActionResponse
+    ) -> None:
+        """Bulk delete Dag Runs."""
+        keys: set[tuple[str, str]] = set()
+
+        for entity in action.entities:
+            if isinstance(entity, str):
+                dag_id, dag_run_id = self.dag_id, entity
+            else:
+                dag_id = entity.dag_id or self.dag_id
+                dag_run_id = entity.dag_run_id
+
+            if dag_id == "~" or dag_run_id == "~":
+                if isinstance(entity, str):
+                    error_msg = (
+                        "When using wildcard in path, dag_id must be specified 
in BulkDAGRunBody"
+                        f" object, not as string for dag_run_id: {entity}"
+                    )
+                else:
+                    error_msg = (
+                        "When using wildcard in path, dag_id must be specified 
in request body for"
+                        f" dag_run_id: {entity.dag_run_id}"
+                    )
+                results.errors.append(
+                    {"error": error_msg, "status_code": 
status.HTTP_400_BAD_REQUEST},
+                )
+                continue
+
+            keys.add((dag_id, dag_run_id))
+
+        if not keys:
+            return
+
+        dag_runs = self.session.scalars(
+            select(DagRun).where(tuple_(DagRun.dag_id, 
DagRun.run_id).in_(list(keys)))
+        ).all()
+        dag_run_map = {(dr.dag_id, dr.run_id): dr for dr in dag_runs}
+        not_found = keys - dag_run_map.keys()
+
+        if action.action_on_non_existence == BulkActionNotOnExistence.FAIL:
+            for dag_id, run_id in sorted(not_found):
+                results.errors.append(
+                    {
+                        "error": (f"The DagRun with dag_id: `{dag_id}` and 
run_id: `{run_id}` was not found"),
+                        "status_code": status.HTTP_404_NOT_FOUND,
+                    }
+                )
+
+        deletable_states = {s.value for s in DagRunMutableStates}
+        for (dag_id, run_id), dag_run in dag_run_map.items():
+            if dag_run.state not in deletable_states:
+                results.errors.append(
+                    {
+                        "error": (
+                            f"The DagRun with dag_id: `{dag_id}` and run_id: 
`{run_id}` "
+                            f"cannot be deleted in {dag_run.state} state"
+                        ),
+                        "status_code": status.HTTP_409_CONFLICT,
+                    }
+                )
+                continue
+            self.session.delete(dag_run)
+            results.success.append(f"{dag_id}.{run_id}")
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index 812a41d9d08..f8e3dbe9af6 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -138,13 +138,6 @@ export const UseDagRunServiceGetDagRunKeyFn = ({ dagId, 
dagRunId }: {
   dagId: string;
   dagRunId: string;
 }, queryKey?: Array<unknown>) => [useDagRunServiceGetDagRunKey, ...(queryKey 
?? [{ dagId, dagRunId }])];
-export type DagRunServiceGetUpstreamAssetEventsDefaultResponse = 
Awaited<ReturnType<typeof DagRunService.getUpstreamAssetEvents>>;
-export type DagRunServiceGetUpstreamAssetEventsQueryResult<TData = 
DagRunServiceGetUpstreamAssetEventsDefaultResponse, TError = unknown> = 
UseQueryResult<TData, TError>;
-export const useDagRunServiceGetUpstreamAssetEventsKey = 
"DagRunServiceGetUpstreamAssetEvents";
-export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = ({ dagId, dagRunId 
}: {
-  dagId: string;
-  dagRunId: string;
-}, queryKey?: Array<unknown>) => [useDagRunServiceGetUpstreamAssetEventsKey, 
...(queryKey ?? [{ dagId, dagRunId }])];
 export type DagRunServiceGetDagRunsDefaultResponse = Awaited<ReturnType<typeof 
DagRunService.getDagRuns>>;
 export type DagRunServiceGetDagRunsQueryResult<TData = 
DagRunServiceGetDagRunsDefaultResponse, TError = unknown> = 
UseQueryResult<TData, TError>;
 export const useDagRunServiceGetDagRunsKey = "DagRunServiceGetDagRuns";
@@ -193,6 +186,13 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({ 
bundleVersion, confContains, c
   updatedAtLt?: string;
   updatedAtLte?: string;
 }, queryKey?: Array<unknown>) => [useDagRunServiceGetDagRunsKey, ...(queryKey 
?? [{ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, 
dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, 
durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, 
logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, 
partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, 
runAfterLt, runAfterLte, runIdPattern, r [...]
+export type DagRunServiceGetUpstreamAssetEventsDefaultResponse = 
Awaited<ReturnType<typeof DagRunService.getUpstreamAssetEvents>>;
+export type DagRunServiceGetUpstreamAssetEventsQueryResult<TData = 
DagRunServiceGetUpstreamAssetEventsDefaultResponse, TError = unknown> = 
UseQueryResult<TData, TError>;
+export const useDagRunServiceGetUpstreamAssetEventsKey = 
"DagRunServiceGetUpstreamAssetEvents";
+export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = ({ dagId, dagRunId 
}: {
+  dagId: string;
+  dagRunId: string;
+}, queryKey?: Array<unknown>) => [useDagRunServiceGetUpstreamAssetEventsKey, 
...(queryKey ?? [{ dagId, dagRunId }])];
 export type DagRunServiceWaitDagRunUntilFinishedDefaultResponse = 
Awaited<ReturnType<typeof DagRunService.waitDagRunUntilFinished>>;
 export type DagRunServiceWaitDagRunUntilFinishedQueryResult<TData = 
DagRunServiceWaitDagRunUntilFinishedDefaultResponse, TError = unknown> = 
UseQueryResult<TData, TError>;
 export const useDagRunServiceWaitDagRunUntilFinishedKey = 
"DagRunServiceWaitDagRunUntilFinished";
@@ -1034,8 +1034,8 @@ export type 
BackfillServiceCreateBackfillDryRunMutationResult = Awaited<ReturnTy
 export type ConnectionServicePostConnectionMutationResult = 
Awaited<ReturnType<typeof ConnectionService.postConnection>>;
 export type ConnectionServiceTestConnectionMutationResult = 
Awaited<ReturnType<typeof ConnectionService.testConnection>>;
 export type ConnectionServiceCreateDefaultConnectionsMutationResult = 
Awaited<ReturnType<typeof ConnectionService.createDefaultConnections>>;
-export type DagRunServiceClearDagRunMutationResult = Awaited<ReturnType<typeof 
DagRunService.clearDagRun>>;
 export type DagRunServiceTriggerDagRunMutationResult = 
Awaited<ReturnType<typeof DagRunService.triggerDagRun>>;
+export type DagRunServiceClearDagRunMutationResult = Awaited<ReturnType<typeof 
DagRunService.clearDagRun>>;
 export type DagRunServiceGetListDagRunsBatchMutationResult = 
Awaited<ReturnType<typeof DagRunService.getListDagRunsBatch>>;
 export type DagServiceFavoriteDagMutationResult = Awaited<ReturnType<typeof 
DagService.favoriteDag>>;
 export type DagServiceUnfavoriteDagMutationResult = Awaited<ReturnType<typeof 
DagService.unfavoriteDag>>;
@@ -1054,6 +1054,7 @@ export type DagParsingServiceReparseDagFileMutationResult 
= Awaited<ReturnType<t
 export type ConnectionServicePatchConnectionMutationResult = 
Awaited<ReturnType<typeof ConnectionService.patchConnection>>;
 export type ConnectionServiceBulkConnectionsMutationResult = 
Awaited<ReturnType<typeof ConnectionService.bulkConnections>>;
 export type DagRunServicePatchDagRunMutationResult = Awaited<ReturnType<typeof 
DagRunService.patchDagRun>>;
+export type DagRunServiceBulkDagRunsMutationResult = Awaited<ReturnType<typeof 
DagRunService.bulkDagRuns>>;
 export type DagServicePatchDagsMutationResult = Awaited<ReturnType<typeof 
DagService.patchDags>>;
 export type DagServicePatchDagMutationResult = Awaited<ReturnType<typeof 
DagService.patchDag>>;
 export type TaskInstanceServicePatchTaskInstanceMutationResult = 
Awaited<ReturnType<typeof TaskInstanceService.patchTaskInstance>>;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
index 32d74fed6e9..f124d321a1f 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
@@ -264,19 +264,6 @@ export const ensureUseDagRunServiceGetDagRunData = 
(queryClient: QueryClient, {
   dagRunId: string;
 }) => queryClient.ensureQueryData({ queryKey: 
Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }), queryFn: () => 
DagRunService.getDagRun({ dagId, dagRunId }) });
 /**
-* Get Upstream Asset Events
-* If dag run is asset-triggered, return the asset events that triggered it.
-* @param data The data for the request.
-* @param data.dagId
-* @param data.dagRunId
-* @returns AssetEventCollectionResponse Successful Response
-* @throws ApiError
-*/
-export const ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: 
QueryClient, { dagId, dagRunId }: {
-  dagId: string;
-  dagRunId: string;
-}) => queryClient.ensureQueryData({ queryKey: 
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId }), 
queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) });
-/**
 * Get Dag Runs
 * Get all Dag Runs.
 *
@@ -391,6 +378,19 @@ export const ensureUseDagRunServiceGetDagRunsData = 
(queryClient: QueryClient, {
   updatedAtLte?: string;
 }) => queryClient.ensureQueryData({ queryKey: 
Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, 
consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, 
dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, 
endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, 
logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, 
partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, 
runIdPattern [...]
 /**
+* Get Upstream Asset Events
+* If dag run is asset-triggered, return the asset events that triggered it.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @returns AssetEventCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: 
QueryClient, { dagId, dagRunId }: {
+  dagId: string;
+  dagRunId: string;
+}) => queryClient.ensureQueryData({ queryKey: 
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId }), 
queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) });
+/**
 * Experimental: Wait for a dag run to complete, and return task results if 
requested.
 * 🚧 This is an experimental endpoint and may change or be removed without 
notice.Successful response are streamed as newline-delimited JSON (NDJSON). 
Each line is a JSON object representing the Dag run state.
 * @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
index ae012539160..5bdcfe667b2 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -264,19 +264,6 @@ export const prefetchUseDagRunServiceGetDagRun = 
(queryClient: QueryClient, { da
   dagRunId: string;
 }) => queryClient.prefetchQuery({ queryKey: 
Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }), queryFn: () => 
DagRunService.getDagRun({ dagId, dagRunId }) });
 /**
-* Get Upstream Asset Events
-* If dag run is asset-triggered, return the asset events that triggered it.
-* @param data The data for the request.
-* @param data.dagId
-* @param data.dagRunId
-* @returns AssetEventCollectionResponse Successful Response
-* @throws ApiError
-*/
-export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (queryClient: 
QueryClient, { dagId, dagRunId }: {
-  dagId: string;
-  dagRunId: string;
-}) => queryClient.prefetchQuery({ queryKey: 
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId }), 
queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) });
-/**
 * Get Dag Runs
 * Get all Dag Runs.
 *
@@ -391,6 +378,19 @@ export const prefetchUseDagRunServiceGetDagRuns = 
(queryClient: QueryClient, { b
   updatedAtLte?: string;
 }) => queryClient.prefetchQuery({ queryKey: 
Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, 
consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, 
dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, 
endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, 
logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, 
partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, 
runIdPattern,  [...]
 /**
+* Get Upstream Asset Events
+* If dag run is asset-triggered, return the asset events that triggered it.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @returns AssetEventCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (queryClient: 
QueryClient, { dagId, dagRunId }: {
+  dagId: string;
+  dagRunId: string;
+}) => queryClient.prefetchQuery({ queryKey: 
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId }), 
queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) });
+/**
 * Experimental: Wait for a dag run to complete, and return task results if 
requested.
 * 🚧 This is an experimental endpoint and may change or be removed without 
notice.Successful response are streamed as newline-delimited JSON (NDJSON). 
Each line is a JSON object representing the Dag run state.
 * @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 17ee1009ae0..8c0976ec328 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -2,7 +2,7 @@
 
 import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from 
"@tanstack/react-query";
 import { AssetService, AssetStateService, AuthLinksService, BackfillService, 
CalendarService, ConfigService, ConnectionService, DagParsingService, 
DagRunService, DagService, DagSourceService, DagStatsService, 
DagVersionService, DagWarningService, DashboardService, DeadlinesService, 
DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, 
GanttService, GridService, ImportErrorService, JobService, LoginService, 
MonitorService, PartitionedDagRunService, PluginService, P [...]
-import { AssetStateBody, BackfillPostBody, BulkBody_BulkTaskInstanceBody_, 
BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, 
ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, 
DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, 
DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, 
PoolBody, PoolPatchBody, TaskInstancesBatchBody, TaskStateBody, 
TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, [...]
+import { AssetStateBody, BackfillPostBody, BulkBody_BulkDAGRunBody_, 
BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, 
BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, 
CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, 
DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, 
MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, 
TaskInstancesBatchBody, TaskStateBody, TriggerDAGRunPostBody, UpdateHITLDe [...]
 import * as Common from "./common";
 /**
 * Get Assets
@@ -264,19 +264,6 @@ export const useDagRunServiceGetDagRun = <TData = 
Common.DagRunServiceGetDagRunD
   dagRunId: string;
 }, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: 
Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }, queryKey), queryFn: 
() => DagRunService.getDagRun({ dagId, dagRunId }) as TData, ...options });
 /**
-* Get Upstream Asset Events
-* If dag run is asset-triggered, return the asset events that triggered it.
-* @param data The data for the request.
-* @param data.dagId
-* @param data.dagRunId
-* @returns AssetEventCollectionResponse Successful Response
-* @throws ApiError
-*/
-export const useDagRunServiceGetUpstreamAssetEvents = <TData = 
Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId }: {
-  dagId: string;
-  dagRunId: string;
-}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: 
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId }, 
queryKey), queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, 
dagRunId }) as TData, ...options });
-/**
 * Get Dag Runs
 * Get all Dag Runs.
 *
@@ -391,6 +378,19 @@ export const useDagRunServiceGetDagRuns = <TData = 
Common.DagRunServiceGetDagRun
   updatedAtLte?: string;
 }, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: 
Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, 
consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, 
dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, 
endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, 
logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, [...]
 /**
+* Get Upstream Asset Events
+* If dag run is asset-triggered, return the asset events that triggered it.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @returns AssetEventCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const useDagRunServiceGetUpstreamAssetEvents = <TData = 
Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId }: {
+  dagId: string;
+  dagRunId: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: 
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId }, 
queryKey), queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, 
dagRunId }) as TData, ...options });
+/**
 * Experimental: Wait for a dag run to complete, and return task results if 
requested.
 * 🚧 This is an experimental endpoint and may change or be removed without 
notice.Successful response are streamed as newline-delimited JSON (NDJSON). 
Each line is a JSON object representing the Dag run state.
 * @param data The data for the request.
@@ -2160,6 +2160,22 @@ export const useConnectionServiceTestConnection = <TData 
= Common.ConnectionServ
 */
 export const useConnectionServiceCreateDefaultConnections = <TData = 
Common.ConnectionServiceCreateDefaultConnectionsMutationResult, TError = 
unknown, TContext = unknown>(options?: Omit<UseMutationOptions<TData, TError, 
void, TContext>, "mutationFn">) => useMutation<TData, TError, void, TContext>({ 
mutationFn: () => ConnectionService.createDefaultConnections() as unknown as 
Promise<TData>, ...options });
 /**
+* Trigger Dag Run
+* Trigger a Dag.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.requestBody
+* @returns DAGRunResponse Successful Response
+* @throws ApiError
+*/
+export const useDagRunServiceTriggerDagRun = <TData = 
Common.DagRunServiceTriggerDagRunMutationResult, TError = unknown, TContext = 
unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+  dagId: unknown;
+  requestBody: TriggerDAGRunPostBody;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+  dagId: unknown;
+  requestBody: TriggerDAGRunPostBody;
+}, TContext>({ mutationFn: ({ dagId, requestBody }) => 
DagRunService.triggerDagRun({ dagId, requestBody }) as unknown as 
Promise<TData>, ...options });
+/**
 * Clear Dag Run
 * @param data The data for the request.
 * @param data.dagId
@@ -2178,22 +2194,6 @@ export const useDagRunServiceClearDagRun = <TData = 
Common.DagRunServiceClearDag
   requestBody: DAGRunClearBody;
 }, TContext>({ mutationFn: ({ dagId, dagRunId, requestBody }) => 
DagRunService.clearDagRun({ dagId, dagRunId, requestBody }) as unknown as 
Promise<TData>, ...options });
 /**
-* Trigger Dag Run
-* Trigger a Dag.
-* @param data The data for the request.
-* @param data.dagId
-* @param data.requestBody
-* @returns DAGRunResponse Successful Response
-* @throws ApiError
-*/
-export const useDagRunServiceTriggerDagRun = <TData = 
Common.DagRunServiceTriggerDagRunMutationResult, TError = unknown, TContext = 
unknown>(options?: Omit<UseMutationOptions<TData, TError, {
-  dagId: unknown;
-  requestBody: TriggerDAGRunPostBody;
-}, TContext>, "mutationFn">) => useMutation<TData, TError, {
-  dagId: unknown;
-  requestBody: TriggerDAGRunPostBody;
-}, TContext>({ mutationFn: ({ dagId, requestBody }) => 
DagRunService.triggerDagRun({ dagId, requestBody }) as unknown as 
Promise<TData>, ...options });
-/**
 * Get List Dag Runs Batch
 * Get a list of Dag Runs.
 * @param data The data for the request.
@@ -2482,6 +2482,22 @@ export const useDagRunServicePatchDagRun = <TData = 
Common.DagRunServicePatchDag
   updateMask?: string[];
 }, TContext>({ mutationFn: ({ dagId, dagRunId, requestBody, updateMask }) => 
DagRunService.patchDagRun({ dagId, dagRunId, requestBody, updateMask }) as 
unknown as Promise<TData>, ...options });
 /**
+* Bulk Dag Runs
+* Bulk delete Dag Runs.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.requestBody
+* @returns BulkResponse Successful Response
+* @throws ApiError
+*/
+export const useDagRunServiceBulkDagRuns = <TData = 
Common.DagRunServiceBulkDagRunsMutationResult, TError = unknown, TContext = 
unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+  dagId: string;
+  requestBody: BulkBody_BulkDAGRunBody_;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+  dagId: string;
+  requestBody: BulkBody_BulkDAGRunBody_;
+}, TContext>({ mutationFn: ({ dagId, requestBody }) => 
DagRunService.bulkDagRuns({ dagId, requestBody }) as unknown as Promise<TData>, 
...options });
+/**
 * Patch Dags
 * Patch multiple Dags.
 *
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
index 8cdfee041b6..e11772395f0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
@@ -264,19 +264,6 @@ export const useDagRunServiceGetDagRunSuspense = <TData = 
Common.DagRunServiceGe
   dagRunId: string;
 }, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: 
Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }, queryKey), queryFn: 
() => DagRunService.getDagRun({ dagId, dagRunId }) as TData, ...options });
 /**
-* Get Upstream Asset Events
-* If dag run is asset-triggered, return the asset events that triggered it.
-* @param data The data for the request.
-* @param data.dagId
-* @param data.dagRunId
-* @returns AssetEventCollectionResponse Successful Response
-* @throws ApiError
-*/
-export const useDagRunServiceGetUpstreamAssetEventsSuspense = <TData = 
Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId }: {
-  dagId: string;
-  dagRunId: string;
-}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: 
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId }, 
queryKey), queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, 
dagRunId }) as TData, ...options });
-/**
 * Get Dag Runs
 * Get all Dag Runs.
 *
@@ -391,6 +378,19 @@ export const useDagRunServiceGetDagRunsSuspense = <TData = 
Common.DagRunServiceG
   updatedAtLte?: string;
 }, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: 
Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, 
consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, 
dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, 
endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, 
logicalDateLt, logicalDateLte, offset, orderBy, partitionKey [...]
 /**
+* Get Upstream Asset Events
+* If dag run is asset-triggered, return the asset events that triggered it.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @returns AssetEventCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const useDagRunServiceGetUpstreamAssetEventsSuspense = <TData = 
Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId }: {
+  dagId: string;
+  dagRunId: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: 
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId }, 
queryKey), queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, 
dagRunId }) as TData, ...options });
+/**
 * Experimental: Wait for a dag run to complete, and return task results if 
requested.
 * 🚧 This is an experimental endpoint and may change or be removed without 
notice.Successful response are streamed as newline-delimited JSON (NDJSON). 
Each line is a JSON object representing the Dag run state.
 * @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 108480b3dd1..4a4b95f1830 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -684,6 +684,32 @@ The response includes a list of successful keys and any 
errors encountered durin
 This structure helps users understand which key actions succeeded and which 
failed.`
 } as const;
 
+export const $BulkBody_BulkDAGRunBody_ = {
+    properties: {
+        actions: {
+            items: {
+                oneOf: [
+                    {
+                        '$ref': 
'#/components/schemas/BulkCreateAction_BulkDAGRunBody_'
+                    },
+                    {
+                        '$ref': 
'#/components/schemas/BulkUpdateAction_BulkDAGRunBody_'
+                    },
+                    {
+                        '$ref': 
'#/components/schemas/BulkDeleteAction_BulkDAGRunBody_'
+                    }
+                ]
+            },
+            type: 'array',
+            title: 'Actions'
+        }
+    },
+    additionalProperties: false,
+    type: 'object',
+    required: ['actions'],
+    title: 'BulkBody[BulkDAGRunBody]'
+} as const;
+
 export const $BulkBody_BulkTaskInstanceBody_ = {
     properties: {
         actions: {
@@ -788,6 +814,33 @@ export const $BulkBody_VariableBody_ = {
     title: 'BulkBody[VariableBody]'
 } as const;
 
+export const $BulkCreateAction_BulkDAGRunBody_ = {
+    properties: {
+        action: {
+            type: 'string',
+            const: 'create',
+            title: 'Action',
+            description: 'The action to be performed on the entities.'
+        },
+        entities: {
+            items: {
+                '$ref': '#/components/schemas/BulkDAGRunBody'
+            },
+            type: 'array',
+            title: 'Entities',
+            description: 'A list of entities to be created.'
+        },
+        action_on_existence: {
+            '$ref': '#/components/schemas/BulkActionOnExistence',
+            default: 'fail'
+        }
+    },
+    additionalProperties: false,
+    type: 'object',
+    required: ['action', 'entities'],
+    title: 'BulkCreateAction[BulkDAGRunBody]'
+} as const;
+
 export const $BulkCreateAction_BulkTaskInstanceBody_ = {
     properties: {
         action: {
@@ -896,6 +949,65 @@ export const $BulkCreateAction_VariableBody_ = {
     title: 'BulkCreateAction[VariableBody]'
 } as const;
 
+export const $BulkDAGRunBody = {
+    properties: {
+        dag_run_id: {
+            type: 'string',
+            title: 'Dag Run Id'
+        },
+        dag_id: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Dag Id'
+        }
+    },
+    additionalProperties: false,
+    type: 'object',
+    required: ['dag_run_id'],
+    title: 'BulkDAGRunBody',
+    description: 'Request body for bulk delete operations on Dag Runs.'
+} as const;
+
+export const $BulkDeleteAction_BulkDAGRunBody_ = {
+    properties: {
+        action: {
+            type: 'string',
+            const: 'delete',
+            title: 'Action',
+            description: 'The action to be performed on the entities.'
+        },
+        entities: {
+            items: {
+                anyOf: [
+                    {
+                        type: 'string'
+                    },
+                    {
+                        '$ref': '#/components/schemas/BulkDAGRunBody'
+                    }
+                ]
+            },
+            type: 'array',
+            title: 'Entities',
+            description: 'A list of entity id/key or entity objects to be 
deleted.'
+        },
+        action_on_non_existence: {
+            '$ref': '#/components/schemas/BulkActionNotOnExistence',
+            default: 'fail'
+        }
+    },
+    additionalProperties: false,
+    type: 'object',
+    required: ['action', 'entities'],
+    title: 'BulkDeleteAction[BulkDAGRunBody]'
+} as const;
+
 export const $BulkDeleteAction_BulkTaskInstanceBody_ = {
     properties: {
         action: {
@@ -1166,6 +1278,48 @@ export const $BulkTaskInstanceBody = {
     description: 'Request body for bulk update, and delete task instances.'
 } as const;
 
+export const $BulkUpdateAction_BulkDAGRunBody_ = {
+    properties: {
+        action: {
+            type: 'string',
+            const: 'update',
+            title: 'Action',
+            description: 'The action to be performed on the entities.'
+        },
+        entities: {
+            items: {
+                '$ref': '#/components/schemas/BulkDAGRunBody'
+            },
+            type: 'array',
+            title: 'Entities',
+            description: 'A list of entities to be updated.'
+        },
+        update_mask: {
+            anyOf: [
+                {
+                    items: {
+                        type: 'string'
+                    },
+                    type: 'array'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Update Mask',
+            description: 'A list of field names to update for each entity.Only 
these fields will be applied from the request body to the database model.Any 
extra fields provided will be ignored.'
+        },
+        action_on_non_existence: {
+            '$ref': '#/components/schemas/BulkActionNotOnExistence',
+            default: 'fail'
+        }
+    },
+    additionalProperties: false,
+    type: 'object',
+    required: ['action', 'entities'],
+    title: 'BulkUpdateAction[BulkDAGRunBody]'
+} as const;
+
 export const $BulkUpdateAction_BulkTaskInstanceBody_ = {
     properties: {
         action: {
@@ -2696,7 +2850,7 @@ export const $DAGRunPatchBody = {
         state: {
             anyOf: [
                 {
-                    '$ref': '#/components/schemas/DAGRunPatchStates'
+                    '$ref': '#/components/schemas/DagRunMutableStates'
                 },
                 {
                     type: 'null'
@@ -2722,13 +2876,6 @@ export const $DAGRunPatchBody = {
     description: 'Dag Run Serializer for PATCH requests.'
 } as const;
 
-export const $DAGRunPatchStates = {
-    type: 'string',
-    enum: ['queued', 'success', 'failed'],
-    title: 'DAGRunPatchStates',
-    description: 'Enum for Dag Run states when updating a Dag Run.'
-} as const;
-
 export const $DAGRunResponse = {
     properties: {
         dag_run_id: {
@@ -3488,6 +3635,13 @@ export const $DagRunAssetReference = {
     description: 'DagRun serializer for asset responses.'
 } as const;
 
+export const $DagRunMutableStates = {
+    type: 'string',
+    enum: ['queued', 'success', 'failed'],
+    title: 'DagRunMutableStates',
+    description: 'Dag Run states from which the run may be mutated (patched, 
deleted).'
+} as const;
+
 export const $DagRunState = {
     type: 'string',
     enum: ['queued', 'running', 'success', 'failed'],
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index bc1f4471470..75468184977 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -3,7 +3,7 @@
 import type { CancelablePromise } from './core/CancelablePromise';
 import { OpenAPI } from './core/OpenAPI';
 import { request as __request } from './core/request';
-import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, 
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, 
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, 
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, 
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, 
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, 
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, 
Dele [...]
+import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, 
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, 
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, 
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, 
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, 
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, 
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, 
Dele [...]
 
 export class AssetService {
     /**
@@ -937,54 +937,26 @@ export class DagRunService {
     }
     
     /**
-     * Get Upstream Asset Events
-     * If dag run is asset-triggered, return the asset events that triggered 
it.
+     * Bulk Dag Runs
+     * Bulk delete Dag Runs.
      * @param data The data for the request.
      * @param data.dagId
-     * @param data.dagRunId
-     * @returns AssetEventCollectionResponse Successful Response
-     * @throws ApiError
-     */
-    public static getUpstreamAssetEvents(data: GetUpstreamAssetEventsData): 
CancelablePromise<GetUpstreamAssetEventsResponse> {
-        return __request(OpenAPI, {
-            method: 'GET',
-            url: 
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents',
-            path: {
-                dag_id: data.dagId,
-                dag_run_id: data.dagRunId
-            },
-            errors: {
-                401: 'Unauthorized',
-                403: 'Forbidden',
-                404: 'Not Found',
-                422: 'Validation Error'
-            }
-        });
-    }
-    
-    /**
-     * Clear Dag Run
-     * @param data The data for the request.
-     * @param data.dagId
-     * @param data.dagRunId
      * @param data.requestBody
-     * @returns unknown Successful Response
+     * @returns BulkResponse Successful Response
      * @throws ApiError
      */
-    public static clearDagRun(data: ClearDagRunData): 
CancelablePromise<ClearDagRunResponse> {
+    public static bulkDagRuns(data: BulkDagRunsData): 
CancelablePromise<BulkDagRunsResponse> {
         return __request(OpenAPI, {
-            method: 'POST',
-            url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear',
+            method: 'PATCH',
+            url: '/api/v2/dags/{dag_id}/dagRuns',
             path: {
-                dag_id: data.dagId,
-                dag_run_id: data.dagRunId
+                dag_id: data.dagId
             },
             body: data.requestBody,
             mediaType: 'application/json',
             errors: {
                 401: 'Unauthorized',
                 403: 'Forbidden',
-                404: 'Not Found',
                 422: 'Validation Error'
             }
         });
@@ -1148,6 +1120,60 @@ export class DagRunService {
         });
     }
     
+    /**
+     * Get Upstream Asset Events
+     * If dag run is asset-triggered, return the asset events that triggered 
it.
+     * @param data The data for the request.
+     * @param data.dagId
+     * @param data.dagRunId
+     * @returns AssetEventCollectionResponse Successful Response
+     * @throws ApiError
+     */
+    public static getUpstreamAssetEvents(data: GetUpstreamAssetEventsData): 
CancelablePromise<GetUpstreamAssetEventsResponse> {
+        return __request(OpenAPI, {
+            method: 'GET',
+            url: 
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents',
+            path: {
+                dag_id: data.dagId,
+                dag_run_id: data.dagRunId
+            },
+            errors: {
+                401: 'Unauthorized',
+                403: 'Forbidden',
+                404: 'Not Found',
+                422: 'Validation Error'
+            }
+        });
+    }
+    
+    /**
+     * Clear Dag Run
+     * @param data The data for the request.
+     * @param data.dagId
+     * @param data.dagRunId
+     * @param data.requestBody
+     * @returns unknown Successful Response
+     * @throws ApiError
+     */
+    public static clearDagRun(data: ClearDagRunData): 
CancelablePromise<ClearDagRunResponse> {
+        return __request(OpenAPI, {
+            method: 'POST',
+            url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear',
+            path: {
+                dag_id: data.dagId,
+                dag_run_id: data.dagRunId
+            },
+            body: data.requestBody,
+            mediaType: 'application/json',
+            errors: {
+                401: 'Unauthorized',
+                403: 'Forbidden',
+                404: 'Not Found',
+                422: 'Validation Error'
+            }
+        });
+    }
+    
     /**
      * Experimental: Wait for a dag run to complete, and return task results 
if requested.
      * 🚧 This is an experimental endpoint and may change or be removed without 
notice.Successful response are streamed as newline-delimited JSON (NDJSON). 
Each line is a JSON object representing the Dag run state.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 1ea5961b919..70c9fa131c1 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -213,6 +213,10 @@ export type BulkActionResponse = {
     }>;
 };
 
+export type BulkBody_BulkDAGRunBody_ = {
+    actions: Array<(BulkCreateAction_BulkDAGRunBody_ | 
BulkUpdateAction_BulkDAGRunBody_ | BulkDeleteAction_BulkDAGRunBody_)>;
+};
+
 export type BulkBody_BulkTaskInstanceBody_ = {
     actions: Array<(BulkCreateAction_BulkTaskInstanceBody_ | 
BulkUpdateAction_BulkTaskInstanceBody_ | 
BulkDeleteAction_BulkTaskInstanceBody_)>;
 };
@@ -229,6 +233,18 @@ export type BulkBody_VariableBody_ = {
     actions: Array<(BulkCreateAction_VariableBody_ | 
BulkUpdateAction_VariableBody_ | BulkDeleteAction_VariableBody_)>;
 };
 
+export type BulkCreateAction_BulkDAGRunBody_ = {
+    /**
+     * The action to be performed on the entities.
+     */
+    action: "create";
+    /**
+     * A list of entities to be created.
+     */
+    entities: Array<BulkDAGRunBody>;
+    action_on_existence?: BulkActionOnExistence;
+};
+
 export type BulkCreateAction_BulkTaskInstanceBody_ = {
     /**
      * The action to be performed on the entities.
@@ -277,6 +293,26 @@ export type BulkCreateAction_VariableBody_ = {
     action_on_existence?: BulkActionOnExistence;
 };
 
+/**
+ * Request body for bulk delete operations on Dag Runs.
+ */
+export type BulkDAGRunBody = {
+    dag_run_id: string;
+    dag_id?: string | null;
+};
+
+export type BulkDeleteAction_BulkDAGRunBody_ = {
+    /**
+     * The action to be performed on the entities.
+     */
+    action: "delete";
+    /**
+     * A list of entity id/key or entity objects to be deleted.
+     */
+    entities: Array<(string | BulkDAGRunBody)>;
+    action_on_non_existence?: BulkActionNotOnExistence;
+};
+
 export type BulkDeleteAction_BulkTaskInstanceBody_ = {
     /**
      * The action to be performed on the entities.
@@ -363,6 +399,22 @@ export type BulkTaskInstanceBody = {
     dag_run_id?: string | null;
 };
 
+export type BulkUpdateAction_BulkDAGRunBody_ = {
+    /**
+     * The action to be performed on the entities.
+     */
+    action: "update";
+    /**
+     * A list of entities to be updated.
+     */
+    entities: Array<BulkDAGRunBody>;
+    /**
+     * A list of field names to update for each entity.Only these fields will 
be applied from the request body to the database model.Any extra fields 
provided will be ignored.
+     */
+    update_mask?: Array<(string)> | null;
+    action_on_non_existence?: BulkActionNotOnExistence;
+};
+
 export type BulkUpdateAction_BulkTaskInstanceBody_ = {
     /**
      * The action to be performed on the entities.
@@ -731,15 +783,10 @@ export type DAGRunCollectionResponse = {
  * Dag Run Serializer for PATCH requests.
  */
 export type DAGRunPatchBody = {
-    state?: DAGRunPatchStates | null;
+    state?: DagRunMutableStates | null;
     note?: string | null;
 };
 
-/**
- * Enum for Dag Run states when updating a Dag Run.
- */
-export type DAGRunPatchStates = 'queued' | 'success' | 'failed';
-
 /**
  * Dag Run serializer for responses.
  */
@@ -869,6 +916,11 @@ export type DagRunAssetReference = {
     partition_key: string | null;
 };
 
+/**
+ * Dag Run states from which the run may be mutated (patched, deleted).
+ */
+export type DagRunMutableStates = 'queued' | 'success' | 'failed';
+
 /**
  * All possible states that a DagRun can be in.
  *
@@ -2746,20 +2798,12 @@ export type PatchDagRunData = {
 
 export type PatchDagRunResponse = DAGRunResponse;
 
-export type GetUpstreamAssetEventsData = {
+export type BulkDagRunsData = {
     dagId: string;
-    dagRunId: string;
+    requestBody: BulkBody_BulkDAGRunBody_;
 };
 
-export type GetUpstreamAssetEventsResponse = AssetEventCollectionResponse;
-
-export type ClearDagRunData = {
-    dagId: string;
-    dagRunId: string;
-    requestBody: DAGRunClearBody;
-};
-
-export type ClearDagRunResponse = ClearTaskInstanceCollectionResponse | 
DAGRunResponse;
+export type BulkDagRunsResponse = BulkResponse;
 
 export type GetDagRunsData = {
     bundleVersion?: string | null;
@@ -2857,6 +2901,21 @@ export type TriggerDagRunData = {
 
 export type TriggerDagRunResponse = DAGRunResponse;
 
+export type GetUpstreamAssetEventsData = {
+    dagId: string;
+    dagRunId: string;
+};
+
+export type GetUpstreamAssetEventsResponse = AssetEventCollectionResponse;
+
+export type ClearDagRunData = {
+    dagId: string;
+    dagRunId: string;
+    requestBody: DAGRunClearBody;
+};
+
+export type ClearDagRunResponse = ClearTaskInstanceCollectionResponse | 
DAGRunResponse;
+
 export type WaitDagRunUntilFinishedData = {
     dagId: string;
     dagRunId: string;
@@ -5201,14 +5260,35 @@ export type $OpenApiTs = {
             };
         };
     };
-    '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents': {
+    '/api/v2/dags/{dag_id}/dagRuns': {
+        patch: {
+            req: BulkDagRunsData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                200: BulkResponse;
+                /**
+                 * Unauthorized
+                 */
+                401: HTTPExceptionResponse;
+                /**
+                 * Forbidden
+                 */
+                403: HTTPExceptionResponse;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
         get: {
-            req: GetUpstreamAssetEventsData;
+            req: GetDagRunsData;
             res: {
                 /**
                  * Successful Response
                  */
-                200: AssetEventCollectionResponse;
+                200: DAGRunCollectionResponse;
                 /**
                  * Unauthorized
                  */
@@ -5227,15 +5307,17 @@ export type $OpenApiTs = {
                 422: HTTPValidationError;
             };
         };
-    };
-    '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear': {
         post: {
-            req: ClearDagRunData;
+            req: TriggerDagRunData;
             res: {
                 /**
                  * Successful Response
                  */
-                200: ClearTaskInstanceCollectionResponse | DAGRunResponse;
+                200: DAGRunResponse;
+                /**
+                 * Bad Request
+                 */
+                400: HTTPExceptionResponse;
                 /**
                  * Unauthorized
                  */
@@ -5248,6 +5330,10 @@ export type $OpenApiTs = {
                  * Not Found
                  */
                 404: HTTPExceptionResponse;
+                /**
+                 * Conflict
+                 */
+                409: HTTPExceptionResponse;
                 /**
                  * Validation Error
                  */
@@ -5255,14 +5341,14 @@ export type $OpenApiTs = {
             };
         };
     };
-    '/api/v2/dags/{dag_id}/dagRuns': {
+    '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents': {
         get: {
-            req: GetDagRunsData;
+            req: GetUpstreamAssetEventsData;
             res: {
                 /**
                  * Successful Response
                  */
-                200: DAGRunCollectionResponse;
+                200: AssetEventCollectionResponse;
                 /**
                  * Unauthorized
                  */
@@ -5281,17 +5367,15 @@ export type $OpenApiTs = {
                 422: HTTPValidationError;
             };
         };
+    };
+    '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear': {
         post: {
-            req: TriggerDagRunData;
+            req: ClearDagRunData;
             res: {
                 /**
                  * Successful Response
                  */
-                200: DAGRunResponse;
-                /**
-                 * Bad Request
-                 */
-                400: HTTPExceptionResponse;
+                200: ClearTaskInstanceCollectionResponse | DAGRunResponse;
                 /**
                  * Unauthorized
                  */
@@ -5304,10 +5388,6 @@ export type $OpenApiTs = {
                  * Not Found
                  */
                 404: HTTPExceptionResponse;
-                /**
-                 * Conflict
-                 */
-                409: HTTPExceptionResponse;
                 /**
                  * Validation Error
                  */
diff --git 
a/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsButton.tsx 
b/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsButton.tsx
index 975691c9657..7268e837b71 100644
--- a/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsButton.tsx
+++ b/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsButton.tsx
@@ -23,7 +23,7 @@ import { useTranslation } from "react-i18next";
 import { FiX } from "react-icons/fi";
 import { LuCheck } from "react-icons/lu";
 
-import type { DAGRunPatchStates, DAGRunResponse } from 
"openapi/requests/types.gen";
+import type { DagRunMutableStates, DAGRunResponse } from 
"openapi/requests/types.gen";
 import { StateBadge } from "src/components/StateBadge";
 import { IconButton, Menu, Tooltip } from "src/components/ui";
 
@@ -37,7 +37,7 @@ type Props = {
 
 const MarkRunAsButton = ({ dagRun, isHotkeyEnabled = false }: Props) => {
   const { onClose, onOpen, open } = useDisclosure();
-  const [state, setState] = useState<DAGRunPatchStates>("success");
+  const [state, setState] = useState<DagRunMutableStates>("success");
   const { t: translate } = useTranslation();
 
   useHotkeys(
diff --git 
a/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsDialog.tsx 
b/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsDialog.tsx
index bff16fcde0a..0c01cb01981 100644
--- a/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsDialog.tsx
+++ b/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsDialog.tsx
@@ -20,7 +20,7 @@ import { Button, Flex, Heading, VStack } from 
"@chakra-ui/react";
 import { useState } from "react";
 import { useTranslation } from "react-i18next";
 
-import type { DAGRunPatchStates, DAGRunResponse } from 
"openapi/requests/types.gen";
+import type { DagRunMutableStates, DAGRunResponse } from 
"openapi/requests/types.gen";
 import { ActionAccordion } from "src/components/ActionAccordion";
 import { StateBadge } from "src/components/StateBadge";
 import { Dialog } from "src/components/ui";
@@ -30,7 +30,7 @@ type Props = {
   readonly dagRun: DAGRunResponse;
   readonly onClose: () => void;
   readonly open: boolean;
-  readonly state: DAGRunPatchStates;
+  readonly state: DagRunMutableStates;
 };
 
 const MarkRunAsDialog = ({ dagRun, onClose, open, state }: Props) => {
diff --git a/airflow-core/src/airflow/ui/src/components/MarkAs/utils.ts 
b/airflow-core/src/airflow/ui/src/components/MarkAs/utils.ts
index 14245602bfb..5d62fd27b94 100644
--- a/airflow-core/src/airflow/ui/src/components/MarkAs/utils.ts
+++ b/airflow-core/src/airflow/ui/src/components/MarkAs/utils.ts
@@ -16,6 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-import type { DAGRunPatchStates } from "openapi/requests/types.gen";
+import type { DagRunMutableStates } from "openapi/requests/types.gen";
 
-export const allowedStates: Array<DAGRunPatchStates> = ["success", "failed"];
+export const allowedStates: Array<DagRunMutableStates> = ["success", "failed"];
diff --git 
a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx 
b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx
new file mode 100644
index 00000000000..e1a4b9c5c35
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx
@@ -0,0 +1,176 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Box, Button, Flex, Heading, Text, useDisclosure, VStack } from 
"@chakra-ui/react";
+import type { ColumnDef } from "@tanstack/react-table";
+import type { TFunction } from "i18next";
+import { useTranslation } from "react-i18next";
+import { FiTrash2 } from "react-icons/fi";
+
+import type { DAGRunResponse } from "openapi/requests/types.gen";
+import { DataTable } from "src/components/DataTable";
+import { ErrorAlert } from "src/components/ErrorAlert";
+import { StateBadge } from "src/components/StateBadge";
+import Time from "src/components/Time";
+import { Accordion, Dialog } from "src/components/ui";
+import { useBulkDeleteDagRuns } from "src/queries/useBulkDeleteDagRuns";
+
+type Props = {
+  readonly clearSelections: VoidFunction;
+  readonly selectedDagRuns: Array<DAGRunResponse>;
+};
+
+const getColumns = (translate: TFunction): Array<ColumnDef<DAGRunResponse>> => 
[
+  {
+    accessorKey: "dag_run_id",
+    cell: ({ row: { original } }) => <Text>{original.dag_run_id}</Text>,
+    enableSorting: false,
+    header: translate("dagRunId"),
+  },
+  {
+    accessorKey: "state",
+    cell: ({ row: { original } }) => (
+      <StateBadge 
state={original.state}>{translate(`common:states.${original.state}`)}</StateBadge>
+    ),
+    enableSorting: false,
+    header: translate("state"),
+  },
+  {
+    accessorKey: "run_after",
+    cell: ({ row: { original } }) => <Time datetime={original.run_after} />,
+    enableSorting: false,
+    header: translate("dagRun.runAfter"),
+  },
+];
+
+const BulkDeleteDagRunsButton = ({ clearSelections, selectedDagRuns }: Props) 
=> {
+  const { t: translate } = useTranslation(["common", "dags"]);
+  const { onClose, onOpen, open } = useDisclosure();
+  const { bulkAction, error, isPending } = useBulkDeleteDagRuns({
+    clearSelections,
+    onSuccessConfirm: onClose,
+  });
+
+  const columns = getColumns(translate);
+
+  const byDagId = new Map<string, Array<DAGRunResponse>>();
+
+  for (const dagRun of selectedDagRuns) {
+    const group = byDagId.get(dagRun.dag_id) ?? [];
+
+    group.push(dagRun);
+    byDagId.set(dagRun.dag_id, group);
+  }
+
+  const isGrouped = byDagId.size > 1;
+
+  return (
+    <>
+      <Button colorPalette="danger" onClick={onOpen} size="sm" 
variant="outline">
+        <FiTrash2 />
+        {translate("dags:runAndTaskActions.delete.button", { type: 
translate("dagRun_other") })}
+      </Button>
+
+      <Dialog.Root onOpenChange={onClose} open={open} size="xl">
+        <Dialog.Content backdrop>
+          <Dialog.Header>
+            <VStack align="start" gap={4}>
+              <Heading size="xl">
+                {translate("dags:runAndTaskActions.delete.dialog.title", {
+                  type: translate("dagRun_other"),
+                })}
+              </Heading>
+            </VStack>
+          </Dialog.Header>
+
+          <Dialog.CloseTrigger />
+          <Dialog.Body width="full">
+            <Text color="fg.subtle" fontSize="sm" mb={4}>
+              {translate("dags:runAndTaskActions.delete.dialog.warning", {
+                type: translate("dagRun_other"),
+              })}
+            </Text>
+
+            <Box maxH="400px" overflowY="auto">
+              {isGrouped ? (
+                <Accordion.Root collapsible multiple variant="enclosed">
+                  {[...byDagId.entries()].map(([dagId, dagRuns]) => (
+                    <Accordion.Item key={dagId} value={dagId}>
+                      <Accordion.ItemTrigger>
+                        <Text fontSize="sm" fontWeight="semibold">
+                          {translate("dagId")}: {dagId}{" "}
+                          <Text as="span" color="fg.subtle" 
fontWeight="normal">
+                            ({dagRuns.length})
+                          </Text>
+                        </Text>
+                      </Accordion.ItemTrigger>
+                      <Accordion.ItemContent>
+                        <DataTable
+                          columns={columns}
+                          data={dagRuns}
+                          displayMode="table"
+                          modelName="common:dagRun"
+                          total={dagRuns.length}
+                        />
+                      </Accordion.ItemContent>
+                    </Accordion.Item>
+                  ))}
+                </Accordion.Root>
+              ) : (
+                <DataTable
+                  columns={columns}
+                  data={selectedDagRuns}
+                  displayMode="table"
+                  modelName="common:dagRun"
+                  total={selectedDagRuns.length}
+                />
+              )}
+            </Box>
+
+            <ErrorAlert error={error} />
+            <Flex justifyContent="end" mt={3}>
+              <Button
+                colorPalette="danger"
+                loading={isPending}
+                onClick={() => {
+                  bulkAction({
+                    actions: [
+                      {
+                        action: "delete" as const,
+                        action_on_non_existence: "skip",
+                        entities: selectedDagRuns.map((dagRun) => ({
+                          dag_id: dagRun.dag_id,
+                          dag_run_id: dagRun.dag_run_id,
+                        })),
+                      },
+                    ],
+                  });
+                }}
+              >
+                <FiTrash2 />
+                <Text fontWeight="bold">{translate("modal.confirm")}</Text>
+              </Button>
+            </Flex>
+          </Dialog.Body>
+        </Dialog.Content>
+      </Dialog.Root>
+    </>
+  );
+};
+
+export default BulkDeleteDagRunsButton;
diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx 
b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.test.tsx
similarity index 100%
rename from airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx
rename to airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.test.tsx
diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx 
b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx
similarity index 82%
rename from airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
rename to airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx
index bc1aa2f3d14..217a9e62d98 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx
@@ -27,6 +27,7 @@ import type { DAGRunResponse } from 
"openapi/requests/types.gen";
 import { ClearRunButton } from "src/components/Clear";
 import { DagVersion } from "src/components/DagVersion";
 import { DataTable } from "src/components/DataTable";
+import { useRowSelection, type GetColumnsParams } from 
"src/components/DataTable/useRowSelection";
 import { useTableURLState } from "src/components/DataTable/useTableUrlState";
 import { ErrorAlert } from "src/components/ErrorAlert";
 import { LimitedItemsList } from "src/components/LimitedItemsList";
@@ -37,12 +38,18 @@ import { StateBadge } from "src/components/StateBadge";
 import Time from "src/components/Time";
 import { TruncatedText } from "src/components/TruncatedText";
 import { RouterLink } from "src/components/ui";
+import { ActionBar } from "src/components/ui/ActionBar";
+import { Checkbox } from "src/components/ui/Checkbox";
 import { SearchParamsKeys, type SearchParamsKeysType } from 
"src/constants/searchParams";
 import { useAdvancedSearchArg } from "src/hooks/useAdvancedSearch";
-import { DagRunsFilters } from "src/pages/DagRunsFilters";
-import DeleteRunButton from "src/pages/DeleteRunButton";
 import { renderDuration, useAutoRefresh, isStatePending } from "src/utils";
 
+import BulkDeleteDagRunsButton from "./BulkDeleteDagRunsButton";
+import { DagRunsFilters } from "./DagRunsFilters";
+import DeleteRunButton from "./DeleteRunButton";
+
+const getRowKey = (dagRun: DAGRunResponse) => 
`${dagRun.dag_id}:${dagRun.dag_run_id}`;
+
 type DagRunRow = { row: { original: DAGRunResponse } };
 const {
   BUNDLE_VERSION: BUNDLE_VERSION_PARAM,
@@ -67,7 +74,43 @@ const {
   TRIGGERING_USER_NAME_PATTERN: TRIGGERING_USER_NAME_PATTERN_PARAM,
 }: SearchParamsKeysType = SearchParamsKeys;
 
-const runColumns = (translate: TFunction, dagId?: string): 
Array<ColumnDef<DAGRunResponse>> => [
+type ColumnProps = {
+  readonly dagId?: string;
+  readonly translate: TFunction;
+} & GetColumnsParams;
+
+const runColumns = ({
+  allRowsSelected,
+  dagId,
+  onRowSelect,
+  onSelectAll,
+  selectedRows,
+  translate,
+}: ColumnProps): Array<ColumnDef<DAGRunResponse>> => [
+  {
+    accessorKey: "select",
+    cell: ({ row }) => (
+      <Checkbox
+        borderWidth={1}
+        checked={selectedRows.get(getRowKey(row.original))}
+        colorPalette="brand"
+        onCheckedChange={(event) => onRowSelect(getRowKey(row.original), 
Boolean(event.checked))}
+      />
+    ),
+    enableHiding: false,
+    enableSorting: false,
+    header: () => (
+      <Checkbox
+        borderWidth={1}
+        checked={allRowsSelected}
+        colorPalette="brand"
+        onCheckedChange={(event) => onSelectAll(Boolean(event.checked))}
+      />
+    ),
+    meta: {
+      skeletonWidth: 10,
+    },
+  },
   ...(Boolean(dagId)
     ? []
     : [
@@ -287,11 +330,27 @@ export const DagRuns = () => {
     },
   );
 
-  const columns = runColumns(translate, dagId);
-
   const nextCursor = data?.next_cursor ?? undefined;
   const previousCursor = data?.previous_cursor ?? undefined;
 
+  const { allRowsSelected, clearSelections, handleRowSelect, handleSelectAll, 
selectedRows } =
+    useRowSelection({
+      data: data?.dag_runs,
+      getKey: getRowKey,
+    });
+
+  const selectedDagRuns = (data?.dag_runs ?? []).filter((dagRun) => 
selectedRows.has(getRowKey(dagRun)));
+
+  const columns = runColumns({
+    allRowsSelected,
+    dagId,
+    multiTeam: false,
+    onRowSelect: handleRowSelect,
+    onSelectAll: handleSelectAll,
+    selectedRows,
+    translate,
+  });
+
   return (
     <>
       <DagRunsFilters dagId={dagId} />
@@ -306,6 +365,16 @@ export const DagRuns = () => {
         onStateChange={setTableURLState}
         previousCursor={previousCursor}
       />
+      <ActionBar.Root closeOnInteractOutside={false} 
open={Boolean(selectedRows.size)}>
+        <ActionBar.Content>
+          <ActionBar.SelectionTrigger>
+            {selectedRows.size} {translate("selected")}
+          </ActionBar.SelectionTrigger>
+          <ActionBar.Separator />
+          <BulkDeleteDagRunsButton clearSelections={clearSelections} 
selectedDagRuns={selectedDagRuns} />
+          <ActionBar.CloseTrigger onClick={clearSelections} />
+        </ActionBar.Content>
+      </ActionBar.Root>
     </>
   );
 };
diff --git a/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx 
b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRunsFilters.tsx
similarity index 100%
rename from airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx
rename to airflow-core/src/airflow/ui/src/pages/DagRuns/DagRunsFilters.tsx
diff --git a/airflow-core/src/airflow/ui/src/pages/DeleteRunButton.tsx 
b/airflow-core/src/airflow/ui/src/pages/DagRuns/DeleteRunButton.tsx
similarity index 100%
rename from airflow-core/src/airflow/ui/src/pages/DeleteRunButton.tsx
rename to airflow-core/src/airflow/ui/src/pages/DagRuns/DeleteRunButton.tsx
diff --git a/airflow-core/src/airflow/ui/src/components/MarkAs/utils.ts 
b/airflow-core/src/airflow/ui/src/pages/DagRuns/index.ts
similarity index 84%
copy from airflow-core/src/airflow/ui/src/components/MarkAs/utils.ts
copy to airflow-core/src/airflow/ui/src/pages/DagRuns/index.ts
index 14245602bfb..26ed3a4da89 100644
--- a/airflow-core/src/airflow/ui/src/components/MarkAs/utils.ts
+++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/index.ts
@@ -16,6 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-import type { DAGRunPatchStates } from "openapi/requests/types.gen";
-
-export const allowedStates: Array<DAGRunPatchStates> = ["success", "failed"];
+export { DagRuns } from "./DagRuns";
diff --git a/airflow-core/src/airflow/ui/src/pages/Run/Header.tsx 
b/airflow-core/src/airflow/ui/src/pages/Run/Header.tsx
index 3f83146927c..18c0e927426 100644
--- a/airflow-core/src/airflow/ui/src/pages/Run/Header.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Run/Header.tsx
@@ -33,7 +33,7 @@ import { RunTypeIcon } from "src/components/RunTypeIcon";
 import Time from "src/components/Time";
 import { RouterLink } from "src/components/ui";
 import { SearchParamsKeys } from "src/constants/searchParams";
-import DeleteRunButton from "src/pages/DeleteRunButton";
+import DeleteRunButton from "src/pages/DagRuns/DeleteRunButton";
 import { usePatchDagRun } from "src/queries/usePatchDagRun";
 import { getDuration } from "src/utils";
 
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkDeleteDagRuns.ts 
b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteDagRuns.ts
new file mode 100644
index 00000000000..d0a3fcff978
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteDagRuns.ts
@@ -0,0 +1,114 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueryClient } from "@tanstack/react-query";
+import { useRef, useState } from "react";
+import { useTranslation } from "react-i18next";
+
+import {
+  useDagRunServiceBulkDagRuns,
+  useDagRunServiceGetDagRunsKey,
+  useTaskInstanceServiceGetTaskInstancesKey,
+} from "openapi/queries";
+import type { BulkActionResponse, BulkBody_BulkDAGRunBody_, BulkResponse } 
from "openapi/requests/types.gen";
+import { toaster } from "src/components/ui";
+
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
+
+type Props = {
+  readonly clearSelections: VoidFunction;
+  readonly onSuccessConfirm: VoidFunction;
+};
+
+const handleActionResult = (
+  actionResult: BulkActionResponse,
+  setError: (error: unknown) => void,
+  onSuccess: (count: number, keys: Array<string>) => void,
+) => {
+  const { errors, success } = actionResult;
+
+  if (Array.isArray(errors) && errors.length > 0) {
+    const apiError = errors[0] as { error: string };
+
+    setError({ body: { detail: apiError.error } });
+  } else if (Array.isArray(success) && success.length > 0) {
+    onSuccess(success.length, success);
+  }
+};
+
+export const useBulkDeleteDagRuns = ({ clearSelections, onSuccessConfirm }: 
Props) => {
+  const queryClient = useQueryClient();
+  const [error, setError] = useState<unknown>(undefined);
+  const affectedDagIds = useRef<Set<string>>(new Set());
+  const { t: translate } = useTranslation(["common", "dags"]);
+
+  const onSuccess = async (responseData: BulkResponse) => {
+    await Promise.all([
+      queryClient.invalidateQueries({ queryKey: 
[useDagRunServiceGetDagRunsKey] }),
+      queryClient.invalidateQueries({ queryKey: 
[useTaskInstanceServiceGetTaskInstancesKey] }),
+      ...tiPerAttemptQueryKeys.map((key) => queryClient.invalidateQueries({ 
queryKey: key })),
+      ...[...affectedDagIds.current].flatMap((dagId) =>
+        gridQueryKeys(dagId).map((key) => queryClient.invalidateQueries({ 
queryKey: key })),
+      ),
+    ]);
+
+    if (responseData.delete) {
+      handleActionResult(responseData.delete, setError, (count, keys) => {
+        toaster.create({
+          description: translate("toaster.bulkDelete.success.description", {
+            count,
+            keys: keys.join(", "),
+            resourceName: translate("dagRun_other"),
+          }),
+          title: translate("toaster.bulkDelete.success.title", {
+            resourceName: translate("dagRun_other"),
+          }),
+          type: "success",
+        });
+        clearSelections();
+        onSuccessConfirm();
+      });
+    }
+  };
+
+  const onError = (_error: unknown) => {
+    setError(_error);
+  };
+
+  const { isPending, mutate } = useDagRunServiceBulkDagRuns({
+    onError,
+    onSuccess,
+  });
+
+  const bulkAction = (requestBody: BulkBody_BulkDAGRunBody_) => {
+    setError(undefined);
+    const dagIds = new Set<string>();
+
+    for (const action of requestBody.actions) {
+      for (const entity of action.entities) {
+        if (typeof entity !== "string" && entity.dag_id !== null && 
entity.dag_id !== undefined) {
+          dagIds.add(entity.dag_id);
+        }
+      }
+    }
+    affectedDagIds.current = dagIds;
+    mutate({ dagId: "~", requestBody });
+  };
+
+  return { bulkAction, error, isPending, setError };
+};
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 6dcd2b41219..bc59d204378 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -24,14 +24,18 @@ from unittest import mock
 
 import pytest
 import time_machine
-from sqlalchemy import func, select
+from fastapi.testclient import TestClient
+from sqlalchemy import func, select, update
 
 from airflow._shared.timezones import timezone
+from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser
 from airflow.api_fastapi.core_api.datamodels.dag_versions import 
DagVersionResponse
 from airflow.api_fastapi.core_api.services.public.common import 
resolve_run_on_latest_version
 from airflow.models import DagModel, DagRun, Log
 from airflow.models.asset import AssetEvent, AssetModel
+from airflow.models.dagbundle import DagBundleModel
 from airflow.models.taskinstance import TaskInstance
+from airflow.models.team import Team
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.sdk.definitions.asset import Asset
 from airflow.sdk.definitions.param import Param
@@ -2627,3 +2631,271 @@ class TestWaitDagRun:
         assert response.status_code == 200
         data = response.json()
         assert data == {"state": DagRunState.SUCCESS}
+
+
+class TestBulkDagRuns:
+    ENDPOINT_URL = f"/dags/{DAG1_ID}/dagRuns"
+    WILDCARD_ENDPOINT = "/dags/~/dagRuns"
+
+    def test_bulk_delete(self, test_client, session):
+        response = test_client.patch(
+            self.ENDPOINT_URL,
+            json={
+                "actions": [
+                    {
+                        "action": "delete",
+                        "entities": [DAG1_RUN1_ID, DAG1_RUN2_ID],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert sorted(body["delete"]["success"]) == sorted(
+            [f"{DAG1_ID}.{DAG1_RUN1_ID}", f"{DAG1_ID}.{DAG1_RUN2_ID}"]
+        )
+        session.expire_all()
+        remaining = session.scalars(select(DagRun).where(DagRun.dag_id == 
DAG1_ID)).all()
+        assert remaining == []
+
+    def test_bulk_delete_with_entity_object(self, test_client, session):
+        response = test_client.patch(
+            self.ENDPOINT_URL,
+            json={
+                "actions": [
+                    {
+                        "action": "delete",
+                        "entities": [{"dag_run_id": DAG1_RUN1_ID}],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["delete"]["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"]
+        session.expire_all()
+        dr = session.scalar(select(DagRun).where(DagRun.run_id == 
DAG1_RUN1_ID))
+        assert dr is None
+
+    def test_bulk_delete_rejects_running_state(self, test_client, dag_maker, 
session):
+        """Mirror the single-run DELETE: a RUNNING Dag Run can't be 
bulk-deleted (409)."""
+        with dag_maker(dag_id="test_running_bulk_dag"):
+            EmptyOperator(task_id="t1")
+        dag_maker.create_dagrun(run_id="running_run", 
state=DagRunState.RUNNING)
+        session.commit()
+
+        response = test_client.patch(
+            "/dags/test_running_bulk_dag/dagRuns",
+            json={
+                "actions": [
+                    {
+                        "action": "delete",
+                        "entities": ["running_run"],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["delete"]["success"] == []
+        assert body["delete"]["errors"] == [
+            {
+                "error": (
+                    "The DagRun with dag_id: `test_running_bulk_dag` and 
run_id: `running_run` "
+                    "cannot be deleted in running state"
+                ),
+                "status_code": 409,
+            }
+        ]
+        session.expire_all()
+        assert session.scalar(select(DagRun).where(DagRun.run_id == 
"running_run")) is not None
+
+    def test_bulk_delete_not_found_fails(self, test_client, session):
+        """When FAIL semantics, each missing run is reported individually and 
matched runs still get deleted."""
+        response = test_client.patch(
+            self.ENDPOINT_URL,
+            json={
+                "actions": [
+                    {
+                        "action": "delete",
+                        "entities": [DAG1_RUN1_ID, "non_existent_run", 
"another_missing_run"],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["delete"]["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"]
+        errors = body["delete"]["errors"]
+        assert len(errors) == 2
+        assert all(err["status_code"] == 404 for err in errors)
+        assert {err["error"] for err in errors} == {
+            f"The DagRun with dag_id: `{DAG1_ID}` and run_id: 
`another_missing_run` was not found",
+            f"The DagRun with dag_id: `{DAG1_ID}` and run_id: 
`non_existent_run` was not found",
+        }
+        session.expire_all()
+        assert session.scalar(select(DagRun).where(DagRun.run_id == 
DAG1_RUN1_ID)) is None
+
+    def test_bulk_delete_not_found_skip(self, test_client, session):
+        response = test_client.patch(
+            self.ENDPOINT_URL,
+            json={
+                "actions": [
+                    {
+                        "action": "delete",
+                        "action_on_non_existence": "skip",
+                        "entities": [DAG1_RUN1_ID, "non_existent_run"],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["delete"]["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"]
+        assert body["delete"]["errors"] == []
+
+    def test_bulk_delete_across_dags_with_wildcard(self, test_client, session):
+        response = test_client.patch(
+            self.WILDCARD_ENDPOINT,
+            json={
+                "actions": [
+                    {
+                        "action": "delete",
+                        "entities": [
+                            {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID},
+                            {"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID},
+                        ],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert sorted(body["delete"]["success"]) == sorted(
+            [f"{DAG1_ID}.{DAG1_RUN1_ID}", f"{DAG2_ID}.{DAG2_RUN1_ID}"]
+        )
+        session.expire_all()
+        assert session.scalar(select(DagRun).where(DagRun.run_id == 
DAG1_RUN1_ID)) is None
+        assert session.scalar(select(DagRun).where(DagRun.run_id == 
DAG2_RUN1_ID)) is None
+
+    def test_bulk_delete_wildcard_requires_dag_id_in_body(self, test_client):
+        response = test_client.patch(
+            self.WILDCARD_ENDPOINT,
+            json={
+                "actions": [
+                    {
+                        "action": "delete",
+                        "entities": [DAG1_RUN1_ID],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["delete"]["success"] == []
+        assert len(body["delete"]["errors"]) == 1
+        assert body["delete"]["errors"][0]["status_code"] == 400
+
+    def test_bulk_create_not_supported(self, test_client):
+        response = test_client.patch(
+            self.ENDPOINT_URL,
+            json={
+                "actions": [
+                    {
+                        "action": "create",
+                        "entities": [{"dag_run_id": "brand_new_run"}],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["create"]["success"] == []
+        assert len(body["create"]["errors"]) == 1
+        assert body["create"]["errors"][0]["status_code"] == 405
+
+    def test_bulk_update_not_supported(self, test_client):
+        response = test_client.patch(
+            self.ENDPOINT_URL,
+            json={
+                "actions": [
+                    {
+                        "action": "update",
+                        "entities": [{"dag_run_id": DAG1_RUN1_ID}],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["update"]["success"] == []
+        assert len(body["update"]["errors"]) == 1
+        assert body["update"]["errors"][0]["status_code"] == 405
+
+    def test_bulk_delete_rejects_unauthorized_dag_ids_from_request_body(self, 
test_client, session):
+        """A 403 at the route level if any entity references a Dag the user 
can't access."""
+        restricted_bundle_name = "restricted-bundle-delete"
+        restricted_team_name = "restricted-team-delete"
+        restricted_bundle = DagBundleModel(name=restricted_bundle_name)
+        restricted_team = Team(name=restricted_team_name)
+        restricted_bundle.teams.append(restricted_team)
+        session.add_all([restricted_bundle, restricted_team])
+        session.flush()
+        # Restrict DAG2 by attaching it to a team-scoped bundle the limited 
user has no access to.
+        session.execute(
+            update(DagModel).where(DagModel.dag_id == 
DAG2_ID).values(bundle_name=restricted_bundle_name)
+        )
+        session.commit()
+
+        auth_manager = test_client.app.state.auth_manager
+        token = auth_manager._get_token_signer().generate(
+            auth_manager.serialize_user(
+                SimpleAuthManagerUser(username="limited-user", role="user", 
teams=[]),
+            )
+        )
+        with (
+            mock.patch("airflow.models.revoked_token.RevokedToken.is_revoked", 
return_value=False),
+            TestClient(
+                test_client.app,
+                headers={"Authorization": f"Bearer {token}"},
+                base_url=str(test_client.base_url),
+            ) as limited_test_client,
+        ):
+            response = limited_test_client.patch(
+                self.WILDCARD_ENDPOINT,
+                json={
+                    "actions": [
+                        {
+                            "action": "delete",
+                            "entities": [
+                                {"dag_id": DAG1_ID, "dag_run_id": 
DAG1_RUN1_ID},
+                                {"dag_id": DAG2_ID, "dag_run_id": 
DAG2_RUN1_ID},
+                            ],
+                        }
+                    ]
+                },
+            )
+
+        assert response.status_code == 403
+        session.expire_all()
+        assert session.scalar(select(DagRun).where(DagRun.run_id == 
DAG1_RUN1_ID)) is not None
+        assert session.scalar(select(DagRun).where(DagRun.run_id == 
DAG2_RUN1_ID)) is not None
+
+    def test_bulk_should_respond_401(self, unauthenticated_test_client):
+        response = unauthenticated_test_client.patch(self.ENDPOINT_URL, 
json={"actions": []})
+        assert response.status_code == 401
+
+    def test_bulk_should_respond_403(self, unauthorized_test_client):
+        """An authenticated user with no Dag permissions gets a 403 at the 
route level."""
+        response = unauthorized_test_client.patch(
+            self.ENDPOINT_URL,
+            json={
+                "actions": [
+                    {
+                        "action": "delete",
+                        "entities": [DAG1_RUN1_ID],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 403
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 2990901b71f..f05fa65cf56 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -129,6 +129,32 @@ class BulkActionResponse(BaseModel):
     ] = []
 
 
+class BulkDAGRunBody(BaseModel):
+    """
+    Request body for bulk delete operations on Dag Runs.
+    """
+
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    dag_run_id: Annotated[str, Field(title="Dag Run Id")]
+    dag_id: Annotated[str | None, Field(title="Dag Id")] = None
+
+
+class BulkDeleteActionBulkDAGRunBody(BaseModel):
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    action: Annotated[
+        Literal["delete"], Field(description="The action to be performed on 
the entities.", title="Action")
+    ]
+    entities: Annotated[
+        list[str | BulkDAGRunBody],
+        Field(description="A list of entity id/key or entity objects to be 
deleted.", title="Entities"),
+    ]
+    action_on_non_existence: BulkActionNotOnExistence | None = "fail"
+
+
 class BulkResponse(BaseModel):
     """
     Serializer for responses to bulk entity operations.
@@ -156,6 +182,26 @@ class Note(RootModel[str]):
     root: Annotated[str, Field(max_length=1000, title="Note")]
 
 
+class BulkUpdateActionBulkDAGRunBody(BaseModel):
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    action: Annotated[
+        Literal["update"], Field(description="The action to be performed on 
the entities.", title="Action")
+    ]
+    entities: Annotated[
+        list[BulkDAGRunBody], Field(description="A list of entities to be 
updated.", title="Entities")
+    ]
+    update_mask: Annotated[
+        list[str] | None,
+        Field(
+            description="A list of field names to update for each entity.Only 
these fields will be applied from the request body to the database model.Any 
extra fields provided will be ignored.",
+            title="Update Mask",
+        ),
+    ] = None
+    action_on_non_existence: BulkActionNotOnExistence | None = "fail"
+
+
 class TaskIds(RootModel[list]):
     root: Annotated[list, Field(max_length=2, min_length=2)]
 
@@ -325,16 +371,6 @@ class DAGRunClearBody(BaseModel):
     ] = None
 
 
-class DAGRunPatchStates(str, Enum):
-    """
-    Enum for Dag Run states when updating a Dag Run.
-    """
-
-    QUEUED = "queued"
-    SUCCESS = "success"
-    FAILED = "failed"
-
-
 class DAGSourceResponse(BaseModel):
     """
     Dag Source serializer for responses.
@@ -385,6 +421,16 @@ class DagRunAssetReference(BaseModel):
     partition_key: Annotated[str | None, Field(title="Partition Key")] = None
 
 
+class DagRunMutableStates(str, Enum):
+    """
+    Dag Run states from which the run may be mutated (patched, deleted).
+    """
+
+    QUEUED = "queued"
+    SUCCESS = "success"
+    FAILED = "failed"
+
+
 class DagRunState(str, Enum):
     """
     All possible states that a DagRun can be in.
@@ -1231,6 +1277,19 @@ class BackfillResponse(BaseModel):
     dag_display_name: Annotated[str, Field(title="Dag Display Name")]
 
 
+class BulkCreateActionBulkDAGRunBody(BaseModel):
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    action: Annotated[
+        Literal["create"], Field(description="The action to be performed on 
the entities.", title="Action")
+    ]
+    entities: Annotated[
+        list[BulkDAGRunBody], Field(description="A list of entities to be 
created.", title="Entities")
+    ]
+    action_on_existence: BulkActionOnExistence | None = "fail"
+
+
 class BulkCreateActionConnectionBody(BaseModel):
     model_config = ConfigDict(
         extra="forbid",
@@ -1553,7 +1612,7 @@ class DAGRunPatchBody(BaseModel):
     model_config = ConfigDict(
         extra="forbid",
     )
-    state: DAGRunPatchStates | None = None
+    state: DagRunMutableStates | None = None
     note: Annotated[Note | None, Field(title="Note")] = None
 
 
@@ -1967,6 +2026,18 @@ class BackfillCollectionResponse(BaseModel):
     total_entries: Annotated[int, Field(title="Total Entries")]
 
 
+class BulkBodyBulkDAGRunBody(BaseModel):
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    actions: Annotated[
+        list[
+            BulkCreateActionBulkDAGRunBody | BulkUpdateActionBulkDAGRunBody | 
BulkDeleteActionBulkDAGRunBody
+        ],
+        Field(title="Actions"),
+    ]
+
+
 class BulkBodyConnectionBody(BaseModel):
     model_config = ConfigDict(
         extra="forbid",

Reply via email to