This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f54b63eb9d UI: Add bulk Clear on the Dag Runs list page (#67564)
7f54b63eb9d is described below

commit 7f54b63eb9d3211dee72443c204e1975e7060635
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Thu May 28 13:53:27 2026 +0200

    UI: Add bulk Clear on the Dag Runs list page (#67564)
    
    Re-introduces collective Clear on the Dag Runs list page — the
    Airflow 2.x ``DagRunModelView`` action that the Airflow 3 UI did not
    yet replicate (#63854).
    
    The button sits next to the bulk Delete shipped in #67095 and opens a
    dialog mirroring the existing single-run Clear: a segmented control
    (``Clear existing tasks`` / ``Clear only failed tasks`` / ``Queue up
    new tasks``), an affected-tasks preview grouped by run, and an optional
    note.
    
    No backend change is required — the dialog fans out the existing
    ``POST /dags/{dag_id}/dagRuns/{dag_run_id}/clear`` endpoint over the
    selected runs with ``Promise.allSettled``, then patches the note via
    ``PATCH /dags/{dag_id}/dagRuns/{dag_run_id}`` on the runs that
    succeeded. Per-run outcomes are surfaced via the partial-failure UX
    landed in #67284: successful rows are deselected, failures stay in the
    selection and appear as inline errors so the user can retry just the
    remaining set.
    
    Bulk Mark as success / failed on Dag Runs (the other half of #63854)
    is intentionally out of scope here.
---
 .../src/pages/DagRuns/BulkClearDagRunsButton.tsx   | 120 ++++++++++++++
 .../src/airflow/ui/src/pages/DagRuns/DagRuns.tsx   |   2 +
 .../airflow/ui/src/queries/useBulkClearDagRuns.ts  | 181 +++++++++++++++++++++
 .../ui/src/queries/useBulkClearDagRunsDryRun.ts    |  82 ++++++++++
 4 files changed, 385 insertions(+)

diff --git 
a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx 
b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx
new file mode 100644
index 00000000000..084a739e483
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx
@@ -0,0 +1,120 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Button, Flex, Heading, VStack, useDisclosure } from 
"@chakra-ui/react";
+import { useState } from "react";
+import { useTranslation } from "react-i18next";
+import { CgRedo } from "react-icons/cg";
+
+import type { DAGRunResponse } from "openapi/requests/types.gen";
+import { ActionAccordion } from "src/components/ActionAccordion";
+import { ActionErrors } from "src/components/ActionErrors";
+import { Dialog } from "src/components/ui";
+import SegmentedControl from "src/components/ui/SegmentedControl";
+import { useBulkClearDagRuns } from "src/queries/useBulkClearDagRuns";
+import { useBulkClearDagRunsDryRun } from 
"src/queries/useBulkClearDagRunsDryRun";
+
+type Props = {
+  readonly deselectKeys: (keys: Array<string>) => void;
+  readonly selectedDagRuns: Array<DAGRunResponse>;
+};
+
+const BulkClearDagRunsButton = ({ deselectKeys, selectedDagRuns }: Props) => {
+  const { t: translate } = useTranslation(["common", "dags"]);
+  const { onClose, onOpen, open } = useDisclosure();
+  const [selectedOptions, setSelectedOptions] = 
useState<Array<string>>(["existingTasks"]);
+  const [note, setNote] = useState<string | null>(null);
+  const { bulkClear, data, isPending } = useBulkClearDagRuns({
+    deselectKeys,
+    onSuccessConfirm: onClose,
+  });
+
+  const handleClose = () => {
+    setNote(null);
+    onClose();
+  };
+
+  const onlyFailed = selectedOptions.includes("onlyFailed");
+  const onlyNew = selectedOptions.includes("newTasks");
+
+  const { data: affectedTasks, isFetching } = useBulkClearDagRunsDryRun(open, 
selectedDagRuns, {
+    onlyFailed,
+    onlyNew,
+  });
+
+  return (
+    <>
+      <Button onClick={onOpen} size="sm" variant="outline">
+        <CgRedo />
+        {translate("dags:runAndTaskActions.clear.button", { type: 
translate("dagRun_other") })}
+      </Button>
+
+      <Dialog.Root onOpenChange={handleClose} open={open} size="xl">
+        <Dialog.Content backdrop>
+          <Dialog.Header>
+            <VStack align="start" gap={4}>
+              <Heading size="xl">
+                {translate("dags:runAndTaskActions.clear.title", { type: 
translate("dagRun_other") })}
+              </Heading>
+            </VStack>
+          </Dialog.Header>
+
+          <Dialog.CloseTrigger />
+          <Dialog.Body width="full">
+            <Flex justifyContent="center" mb={4}>
+              <SegmentedControl
+                defaultValues={["existingTasks"]}
+                onChange={setSelectedOptions}
+                options={[
+                  {
+                    label: 
translate("dags:runAndTaskActions.options.existingTasks"),
+                    value: "existingTasks",
+                  },
+                  {
+                    label: 
translate("dags:runAndTaskActions.options.onlyFailed"),
+                    value: "onlyFailed",
+                  },
+                  {
+                    label: 
translate("dags:runAndTaskActions.options.queueNew"),
+                    value: "newTasks",
+                  },
+                ]}
+              />
+            </Flex>
+            <ActionAccordion affectedTasks={affectedTasks} groupByRunId 
note={note} setNote={setNote} />
+            <ActionErrors actionResponse={data?.clear} error={undefined} />
+            <Flex justifyContent="end" mt={3}>
+              <Button
+                disabled={affectedTasks.total_entries === 0}
+                loading={isPending || isFetching}
+                onClick={() => {
+                  void bulkClear(selectedDagRuns, { note, onlyFailed, onlyNew 
});
+                }}
+              >
+                <CgRedo />
+                {translate("modal.confirm")}
+              </Button>
+            </Flex>
+          </Dialog.Body>
+        </Dialog.Content>
+      </Dialog.Root>
+    </>
+  );
+};
+
+export default BulkClearDagRunsButton;
diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx 
b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx
index 5552fcb8ada..797d91d2afe 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx
@@ -44,6 +44,7 @@ import { SearchParamsKeys, type SearchParamsKeysType } from 
"src/constants/searc
 import { useAdvancedSearchArg } from "src/hooks/useAdvancedSearch";
 import { renderDuration, useAutoRefresh, isStatePending } from "src/utils";
 
+import BulkClearDagRunsButton from "./BulkClearDagRunsButton";
 import BulkDeleteDagRunsButton from "./BulkDeleteDagRunsButton";
 import { DagRunsFilters } from "./DagRunsFilters";
 import DeleteRunButton from "./DeleteRunButton";
@@ -373,6 +374,7 @@ export const DagRuns = () => {
             {selectedRows.size} {translate("selected")}
           </ActionBar.SelectionTrigger>
           <ActionBar.Separator />
+          <BulkClearDagRunsButton deselectKeys={deselectKeys} 
selectedDagRuns={selectedDagRuns} />
           <BulkDeleteDagRunsButton deselectKeys={deselectKeys} 
selectedDagRuns={selectedDagRuns} />
           <ActionBar.CloseTrigger onClick={clearSelections} />
         </ActionBar.Content>
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts 
b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts
new file mode 100644
index 00000000000..33288985fd9
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts
@@ -0,0 +1,181 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueryClient } from "@tanstack/react-query";
+import { useState } from "react";
+import { useTranslation } from "react-i18next";
+
+import {
+  UseDagRunServiceGetDagRunKeyFn,
+  useDagRunServiceGetDagRunsKey,
+  UseGanttServiceGetGanttDataKeyFn,
+  useTaskInstanceServiceGetMappedTaskInstanceKey,
+  useTaskInstanceServiceGetTaskInstanceKey,
+  useTaskInstanceServiceGetTaskInstancesKey,
+} from "openapi/queries";
+import { DagRunService } from "openapi/requests/services.gen";
+import type { BulkActionResponse, DAGRunResponse } from 
"openapi/requests/types.gen";
+import { toaster } from "src/components/ui";
+
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
+import { useBulkClearDagRunsDryRunKey } from "./useBulkClearDagRunsDryRun";
+import { useClearDagRunDryRunKey } from "./useClearDagRunDryRun";
+
+type Props = {
+  readonly deselectKeys: (keys: Array<string>) => void;
+  readonly onSuccessConfirm: VoidFunction;
+};
+
+export type BulkClearDagRunsOptions = {
+  note: string | null;
+  onlyFailed: boolean;
+  onlyNew: boolean;
+};
+
+// Mirrors the bulk-endpoint success key (``{dag_id}.{run_id}``) so callers 
can pass
+// the result straight into ``deselectKeys`` without an extra mapping.
+const getRowKey = (dagRun: DAGRunResponse) => 
`${dagRun.dag_id}.${dagRun.dag_run_id}`;
+
+const formatError = (reason: unknown): string => {
+  if (reason instanceof Error) {
+    return reason.message;
+  }
+  if (typeof reason === "object" && reason !== null && "body" in reason) {
+    const { body } = reason as { body?: { detail?: unknown } };
+
+    if (body?.detail !== undefined) {
+      return typeof body.detail === "string" ? body.detail : 
JSON.stringify(body.detail);
+    }
+  }
+
+  return String(reason);
+};
+
+export const useBulkClearDagRuns = ({ deselectKeys, onSuccessConfirm }: Props) 
=> {
+  const queryClient = useQueryClient();
+  const [data, setData] = useState<{ clear: BulkActionResponse } | 
undefined>(undefined);
+  const [isPending, setIsPending] = useState(false);
+  const { t: translate } = useTranslation(["common", "dags"]);
+
+  const reset = () => {
+    setData(undefined);
+  };
+
+  const invalidateQueries = async (dagRuns: ReadonlyArray<DAGRunResponse>) => {
+    const dagIds = new Set(dagRuns.map((dagRun) => dagRun.dag_id));
+    const keys = [
+      [useDagRunServiceGetDagRunsKey],
+      [useTaskInstanceServiceGetTaskInstancesKey],
+      [useTaskInstanceServiceGetTaskInstanceKey],
+      [useTaskInstanceServiceGetMappedTaskInstanceKey],
+      [useBulkClearDagRunsDryRunKey],
+      ...tiPerAttemptQueryKeys,
+      ...[...dagIds].flatMap((dagId) => [...gridQueryKeys(dagId), 
[useClearDagRunDryRunKey, dagId]]),
+      ...dagRuns.flatMap((dagRun) => [
+        UseDagRunServiceGetDagRunKeyFn({ dagId: dagRun.dag_id, dagRunId: 
dagRun.dag_run_id }),
+        UseGanttServiceGetGanttDataKeyFn({ dagId: dagRun.dag_id, runId: 
dagRun.dag_run_id }),
+      ]),
+    ];
+
+    await Promise.all(keys.map((queryKey) => queryClient.invalidateQueries({ 
queryKey })));
+  };
+
+  const bulkClear = async (dagRuns: Array<DAGRunResponse>, options: 
BulkClearDagRunsOptions) => {
+    reset();
+    setIsPending(true);
+
+    const settled = await Promise.allSettled(
+      dagRuns.map((dagRun) =>
+        DagRunService.clearDagRun({
+          dagId: dagRun.dag_id,
+          dagRunId: dagRun.dag_run_id,
+          requestBody: {
+            dry_run: false,
+            only_failed: options.onlyFailed,
+            only_new: options.onlyNew,
+          },
+        }).then(() => dagRun),
+      ),
+    );
+
+    const succeeded: Array<DAGRunResponse> = [];
+    const errors: Array<Record<string, unknown>> = [];
+
+    settled.forEach((outcome, index) => {
+      if (outcome.status === "fulfilled") {
+        succeeded.push(outcome.value);
+      } else {
+        const dagRun = dagRuns[index];
+
+        errors.push({
+          error: dagRun
+            ? `${getRowKey(dagRun)}: ${formatError(outcome.reason)}`
+            : formatError(outcome.reason),
+        });
+      }
+    });
+
+    if (succeeded.length > 0 && options.note !== null) {
+      const noteSettled = await Promise.allSettled(
+        succeeded
+          .filter((dagRun) => dagRun.note !== options.note)
+          .map((dagRun) =>
+            DagRunService.patchDagRun({
+              dagId: dagRun.dag_id,
+              dagRunId: dagRun.dag_run_id,
+              requestBody: { note: options.note },
+            }).then(() => dagRun),
+          ),
+      );
+
+      noteSettled.forEach((outcome) => {
+        if (outcome.status === "rejected") {
+          errors.push({ error: `note: ${formatError(outcome.reason)}` });
+        }
+      });
+    }
+
+    await invalidateQueries(dagRuns);
+
+    if (succeeded.length > 0) {
+      toaster.create({
+        description: translate("toaster.bulkClear.success.description", {
+          count: succeeded.length,
+          keys: succeeded.map((dagRun) => dagRun.dag_run_id).join(", "),
+          resourceName: translate("dagRun_other"),
+        }),
+        title: translate("toaster.bulkClear.success.title", {
+          resourceName: translate("dagRun_other"),
+        }),
+        type: "success",
+      });
+      deselectKeys(succeeded.map(getRowKey));
+    }
+
+    setData({ clear: { errors, success: succeeded.map(getRowKey) } });
+    setIsPending(false);
+
+    // Per-run failures keep the dialog open so the user can see what failed;
+    // the consumer renders ``data.clear.errors``.
+    if (errors.length === 0) {
+      onSuccessConfirm();
+    }
+  };
+
+  return { bulkClear, data, isPending, reset };
+};
diff --git 
a/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRunsDryRun.ts 
b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRunsDryRun.ts
new file mode 100644
index 00000000000..fbdf858bcc5
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRunsDryRun.ts
@@ -0,0 +1,82 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueries } from "@tanstack/react-query";
+
+import { DagRunService } from "openapi/requests/services.gen";
+import type {
+  ClearTaskInstanceCollectionResponse,
+  DAGRunResponse,
+  TaskInstanceCollectionResponse,
+  TaskInstanceResponse,
+} from "openapi/requests/types.gen";
+
+type Options = {
+  onlyFailed: boolean;
+  onlyNew: boolean;
+};
+
+const EMPTY: TaskInstanceCollectionResponse = { task_instances: [], 
total_entries: 0 };
+
+export const useBulkClearDagRunsDryRunKey = "bulkClearDagRunsDryRun";
+
+export const useBulkClearDagRunsDryRun = (
+  enabled: boolean,
+  selectedDagRuns: Array<DAGRunResponse>,
+  options: Options,
+) => {
+  const results = useQueries({
+    queries: selectedDagRuns.map((dagRun) => ({
+      enabled,
+      queryFn: () =>
+        DagRunService.clearDagRun({
+          dagId: dagRun.dag_id,
+          dagRunId: dagRun.dag_run_id,
+          requestBody: {
+            dry_run: true,
+            only_failed: options.onlyFailed,
+            only_new: options.onlyNew,
+          },
+        }) as Promise<ClearTaskInstanceCollectionResponse>,
+      queryKey: [
+        useBulkClearDagRunsDryRunKey,
+        dagRun.dag_id,
+        dagRun.dag_run_id,
+        { only_failed: options.onlyFailed, only_new: options.onlyNew },
+      ],
+      refetchOnMount: "always" as const,
+    })),
+  });
+
+  const isFetching = results.some((result) => result.isFetching);
+  // Each per-run call is scoped to a distinct ``(dag_id, dag_run_id)`` so the
+  // concatenated array can't contain duplicates; the response is also
+  // homogeneous (``only_new=true`` yields ``NewTaskResponse`` placeholders,
+  // ``false`` yields real ``TaskInstanceResponse``), so the cast is safe even
+  // though the OpenAPI type widens to a union.
+  const taskInstances = results.flatMap((result) => 
result.data?.task_instances ?? []);
+  const data: TaskInstanceCollectionResponse =
+    taskInstances.length === 0
+      ? EMPTY
+      : {
+          task_instances: taskInstances as Array<TaskInstanceResponse>,
+          total_entries: taskInstances.length,
+        };
+
+  return { data, isFetching };
+};

Reply via email to