This is an automated email from the ASF dual-hosted git repository.
shubhamraj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 43945831cef Clear, Mark Success/Fail and delete multiple Task
Instances (#64141)
43945831cef is described below
commit 43945831cef8091a7cf63be1d16a98a499554359
Author: Shubham Raj <[email protected]>
AuthorDate: Fri Mar 27 00:45:04 2026 +0530
Clear, Mark Success/Fail and delete multiple Task Instances (#64141)
---
.../core_api/datamodels/task_instances.py | 1 +
.../core_api/openapi/v2-rest-api-generated.yaml | 6 +
.../core_api/routes/public/task_instances.py | 8 +
.../core_api/services/public/task_instances.py | 13 +-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 12 ++
.../airflow/ui/openapi-gen/requests/types.gen.ts | 1 +
.../airflow/ui/public/i18n/locales/en/common.json | 15 ++
.../components/ActionAccordion/ActionAccordion.tsx | 87 +++++++--
.../TaskInstance/ClearGroupTaskInstanceDialog.tsx | 3 +-
.../TaskInstances/BulkClearTaskInstancesButton.tsx | 157 ++++++++++++++++
.../BulkDeleteTaskInstancesButton.tsx | 153 +++++++++++++++
.../BulkMarkTaskInstancesAsButton.tsx | 208 +++++++++++++++++++++
.../ui/src/pages/TaskInstances/TaskInstances.tsx | 85 ++++++++-
.../airflow/ui/src/queries/useBulkClearDryRun.ts | 113 +++++++++++
.../ui/src/queries/useBulkClearTaskInstances.ts | 118 ++++++++++++
.../airflow/ui/src/queries/useBulkMarkAsDryRun.ts | 113 +++++++++++
.../airflow/ui/src/queries/useBulkTaskInstances.ts | 105 +++++++++++
.../core_api/routes/public/test_task_instances.py | 140 ++++++++++++++
.../src/airflowctl/api/datamodels/generated.py | 1 +
19 files changed, 1316 insertions(+), 23 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
index 397389994a0..c8e7ab2378d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -168,6 +168,7 @@ class ClearTaskInstancesBody(StrictBaseModel):
"clearing the task instances.",
)
prevent_running_task: bool = False
+ note: Annotated[str, StringConstraints(max_length=1000)] | None = None
@model_validator(mode="before")
@classmethod
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 17c4f87a6e4..128d0f2bd05 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -9904,6 +9904,12 @@ components:
type: boolean
title: Prevent Running Task
default: false
+ note:
+ anyOf:
+ - type: string
+ maxLength: 1000
+ - type: 'null'
+ title: Note
additionalProperties: false
type: object
title: ClearTaskInstancesBody
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 97fa930b1c1..9488c0b7591 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -721,6 +721,7 @@ def post_clear_task_instances(
dag_bag: DagBagDep,
body: ClearTaskInstancesBody,
session: SessionDep,
+ user: GetUserDep,
) -> TaskInstanceCollectionResponse:
"""Clear task instances."""
dag = get_latest_version_of_dag(dag_bag, dag_id, session)
@@ -837,6 +838,13 @@ def post_clear_task_instances(
except AirflowClearRunningTaskException as e:
raise HTTPException(status.HTTP_409_CONFLICT, str(e)) from e
+ if body.note is not None:
+ _patch_task_instance_note(
+ task_instance_body=body,
+ tis=task_instances,
+ user=user,
+ )
+
return TaskInstanceCollectionResponse(
task_instances=[TaskInstanceResponse.model_validate(ti) for ti in
task_instances],
total_entries=len(task_instances),
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
index 1c3d7c62913..8ab426bbef6 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
@@ -37,7 +37,11 @@ from airflow.api_fastapi.core_api.datamodels.common import (
BulkDeleteAction,
BulkUpdateAction,
)
-from airflow.api_fastapi.core_api.datamodels.task_instances import
BulkTaskInstanceBody, PatchTaskInstanceBody
+from airflow.api_fastapi.core_api.datamodels.task_instances import (
+ BulkTaskInstanceBody,
+ ClearTaskInstancesBody,
+ PatchTaskInstanceBody,
+)
from airflow.api_fastapi.core_api.security import GetUserDep
from airflow.api_fastapi.core_api.services.public.common import BulkService
from airflow.listeners.listener import get_listener_manager
@@ -139,7 +143,7 @@ def _patch_task_instance_state(
def _patch_task_instance_note(
- task_instance_body: BulkTaskInstanceBody | PatchTaskInstanceBody,
+ task_instance_body: BulkTaskInstanceBody | ClearTaskInstancesBody |
PatchTaskInstanceBody,
tis: list[TI],
user: GetUserDep,
update_mask: list[str] | None = Query(None),
@@ -275,6 +279,7 @@ class
BulkTaskInstanceService(BulkService[BulkTaskInstanceBody]):
dag_bag=self.dag_bag,
body=entity,
session=self.session,
+ map_index=map_index,
update_mask=update_mask,
)
@@ -318,12 +323,12 @@ class
BulkTaskInstanceService(BulkService[BulkTaskInstanceBody]):
try:
specific_entity_map = {
- (entity.dag_id, entity.dag_run_id, entity.task_id,
entity.map_index): entity
+ self._extract_task_identifiers(entity): entity
for entity in action.entities
if entity.map_index is not None
}
all_map_entity_map = {
- (entity.dag_id, entity.dag_run_id, entity.task_id): entity
+ self._extract_task_identifiers(entity)[:3]: entity
for entity in action.entities
if entity.map_index is None
}
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index f47aa107aea..0195c0b564a 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1383,6 +1383,18 @@ export const $ClearTaskInstancesBody = {
type: 'boolean',
title: 'Prevent Running Task',
default: false
+ },
+ note: {
+ anyOf: [
+ {
+ type: 'string',
+ maxLength: 1000
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Note'
}
},
additionalProperties: false,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 7423c08d42c..9c61f27e71e 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -427,6 +427,7 @@ export type ClearTaskInstancesBody = {
*/
run_on_latest_version?: boolean;
prevent_running_task?: boolean;
+ note?: string | null;
};
/**
diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
index 49037f58815..f0ccd787a26 100644
--- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
+++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
@@ -199,6 +199,7 @@
"users": "Users"
},
"selectLanguage": "Select Language",
+ "selected": "Selected",
"showDetailsPanel": "Show Details Panel",
"signedInAs": "Signed in as",
"source": {
@@ -299,6 +300,13 @@
"utc": "UTC (Coordinated Universal Time)"
},
"toaster": {
+ "bulkClear": {
+ "error": "Bulk Clear {{resourceName}} Request Failed",
+ "success": {
+ "description": "{{count}} {{resourceName}} have been successfully
cleared. Keys: {{keys}}",
+ "title": "Bulk Clear {{resourceName}} Request Submitted"
+ }
+ },
"bulkDelete": {
"error": "Bulk Delete {{resourceName}} Request Failed",
"success": {
@@ -306,6 +314,13 @@
"title": "Bulk Delete {{resourceName}} Request Submitted"
}
},
+ "bulkUpdate": {
+ "error": "Bulk Update {{resourceName}} Request Failed",
+ "success": {
+ "description": "{{count}} {{resourceName}} have been successfully
updated. Keys: {{keys}}",
+ "title": "Bulk Update {{resourceName}} Request Submitted"
+ }
+ },
"create": {
"error": "Create {{resourceName}} Request Failed",
"success": {
diff --git
a/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx
b/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx
index e2f5b274a4c..514d93799d5 100644
---
a/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx
+++
b/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx
@@ -20,7 +20,11 @@ import { Box, Editable, Text, VStack } from
"@chakra-ui/react";
import type { ChangeEvent } from "react";
import { useTranslation } from "react-i18next";
-import type { DAGRunResponse, TaskInstanceCollectionResponse } from
"openapi/requests/types.gen";
+import type {
+ DAGRunResponse,
+ TaskInstanceCollectionResponse,
+ TaskInstanceResponse,
+} from "openapi/requests/types.gen";
import ReactMarkdown from "src/components/ReactMarkdown";
import { Accordion } from "src/components/ui";
@@ -29,17 +33,59 @@ import { getColumns } from "./columns";
type Props = {
readonly affectedTasks?: TaskInstanceCollectionResponse;
+ readonly groupByRunId?: boolean;
readonly note: DAGRunResponse["note"];
readonly setNote: (value: string) => void;
};
+const TasksTable = ({
+ noRowsMessage,
+ tasks,
+}: {
+ readonly noRowsMessage: string;
+ readonly tasks: Array<TaskInstanceResponse>;
+}) => {
+ const { t: translate } = useTranslation();
+ const columns = getColumns(translate);
+
+ return (
+ <DataTable
+ columns={columns}
+ data={tasks}
+ displayMode="table"
+ modelName="common:taskInstance"
+ noRowsMessage={noRowsMessage}
+ total={tasks.length}
+ />
+ );
+};
+
// Table is in memory, pagination and sorting are disabled.
// TODO: Make a front-end only unconnected table component with client side
ordering and pagination
-const ActionAccordion = ({ affectedTasks, note, setNote }: Props) => {
+const ActionAccordion = ({ affectedTasks, groupByRunId = false, note, setNote
}: Props) => {
const showTaskSection = affectedTasks !== undefined;
const { t: translate } = useTranslation();
- const columns = getColumns(translate);
+ // Group task instances by dag_run_id when requested
+ const runGroups = (() => {
+ if (!groupByRunId || !affectedTasks) {
+ return undefined;
+ }
+
+ const map = new Map<string, Array<TaskInstanceResponse>>();
+
+ for (const ti of affectedTasks.task_instances) {
+ const group = map.get(ti.dag_run_id) ?? [];
+
+ group.push(ti);
+ map.set(ti.dag_run_id, group);
+ }
+
+ return map;
+ })();
+
+ // Only group when there are actually multiple run IDs
+ const shouldGroup = groupByRunId && runGroups !== undefined &&
runGroups.size > 1;
return (
<Accordion.Root
@@ -59,14 +105,33 @@ const ActionAccordion = ({ affectedTasks, note, setNote }:
Props) => {
</Accordion.ItemTrigger>
<Accordion.ItemContent>
<Box maxH="400px" overflowY="scroll">
- <DataTable
- columns={columns}
- data={affectedTasks.task_instances}
- displayMode="table"
- modelName="common:taskInstance"
-
noRowsMessage={translate("dags:runAndTaskActions.affectedTasks.noItemsFound")}
- total={affectedTasks.total_entries}
- />
+ {shouldGroup ? (
+ <Accordion.Root collapsible multiple variant="plain">
+ {[...runGroups.entries()].map(([runId, tis]) => (
+ <Accordion.Item key={runId} value={runId}>
+ <Accordion.ItemTrigger px={2} py={1}>
+ <Text fontSize="sm" fontWeight="semibold">
+ {translate("runId")}: {runId}{" "}
+ <Text as="span" color="fg.subtle"
fontWeight="normal">
+ ({tis.length})
+ </Text>
+ </Text>
+ </Accordion.ItemTrigger>
+ <Accordion.ItemContent>
+ <TasksTable
+
noRowsMessage={translate("dags:runAndTaskActions.affectedTasks.noItemsFound")}
+ tasks={tis}
+ />
+ </Accordion.ItemContent>
+ </Accordion.Item>
+ ))}
+ </Accordion.Root>
+ ) : (
+ <TasksTable
+
noRowsMessage={translate("dags:runAndTaskActions.affectedTasks.noItemsFound")}
+ tasks={affectedTasks.task_instances}
+ />
+ )}
</Box>
</Accordion.ItemContent>
</Accordion.Item>
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
index 63e1df78526..5dc09c1f421 100644
---
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
+++
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
@@ -57,7 +57,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open,
taskInstance }: Pr
const downstream = selectedOptions.includes("downstream");
const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
- const [note, setNote] = useState<string>("");
+ const [note, setNote] = useState<string | null>(null);
const { data: dagDetails } = useDagServiceGetDagDetails({
dagId,
@@ -186,6 +186,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose,
open, taskInstance }: Pr
include_future: future,
include_past: past,
include_upstream: upstream,
+ ...(note === null ? {} : { note }),
only_failed: onlyFailed,
run_on_latest_version: runOnLatestVersion,
task_ids: groupTaskIds,
diff --git
a/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkClearTaskInstancesButton.tsx
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkClearTaskInstancesButton.tsx
new file mode 100644
index 00000000000..13bc6df292b
--- /dev/null
+++
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkClearTaskInstancesButton.tsx
@@ -0,0 +1,157 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Button, Flex, Heading, VStack, useDisclosure } from
"@chakra-ui/react";
+import { useState } from "react";
+import { useTranslation } from "react-i18next";
+import { CgRedo } from "react-icons/cg";
+
+import type { TaskInstanceResponse } from "openapi/requests/types.gen";
+import { ActionAccordion } from "src/components/ActionAccordion";
+import { ErrorAlert } from "src/components/ErrorAlert";
+import { Checkbox, Dialog } from "src/components/ui";
+import SegmentedControl from "src/components/ui/SegmentedControl";
+import { useBulkClearDryRun } from "src/queries/useBulkClearDryRun";
+import { useBulkClearTaskInstances } from
"src/queries/useBulkClearTaskInstances";
+
+type Props = {
+ readonly clearSelections: VoidFunction;
+ readonly selectedTaskInstances: Array<TaskInstanceResponse>;
+};
+
+const BulkClearTaskInstancesButton = ({ clearSelections, selectedTaskInstances
}: Props) => {
+ const { t: translate } = useTranslation();
+ const { onClose, onOpen, open } = useDisclosure();
+ const [selectedOptions, setSelectedOptions] =
useState<Array<string>>(["downstream"]);
+ const [note, setNote] = useState<string | null>(null);
+ const [preventRunningTask, setPreventRunningTask] = useState(true);
+ const { bulkClear, error, isPending } = useBulkClearTaskInstances({
+ clearSelections,
+ onSuccessConfirm: onClose,
+ });
+
+ const handleClose = () => {
+ setNote(null);
+ onClose();
+ };
+
+ const past = selectedOptions.includes("past");
+ const future = selectedOptions.includes("future");
+ const upstream = selectedOptions.includes("upstream");
+ const downstream = selectedOptions.includes("downstream");
+ const onlyFailed = selectedOptions.includes("onlyFailed");
+
+ const hasLogicalDate = selectedTaskInstances.some((ti) => ti.logical_date
!== null);
+
+ const { data: affectedTasks, isFetching } = useBulkClearDryRun(open,
selectedTaskInstances, {
+ includeDownstream: downstream,
+ includeFuture: future,
+ includeOnlyFailed: onlyFailed,
+ includePast: past,
+ includeUpstream: upstream,
+ });
+
+ return (
+ <>
+ <Button colorPalette="brand" onClick={onOpen} size="sm"
variant="outline">
+ <CgRedo />
+ {translate("dags:runAndTaskActions.clear.button", { type:
translate("taskInstance_other") })}
+ </Button>
+
+ <Dialog.Root onOpenChange={handleClose} open={open} size="xl">
+ <Dialog.Content backdrop>
+ <Dialog.Header>
+ <VStack align="start" gap={4}>
+ <Heading size="xl">
+ {translate("dags:runAndTaskActions.clear.title", {
+ type: translate("taskInstance_other"),
+ })}
+ </Heading>
+ </VStack>
+ </Dialog.Header>
+
+ <Dialog.CloseTrigger />
+ <Dialog.Body width="full">
+ <Flex justifyContent="center" mb={4}>
+ <SegmentedControl
+ defaultValues={["downstream"]}
+ multiple
+ onChange={setSelectedOptions}
+ options={[
+ {
+ disabled: !hasLogicalDate,
+ label: translate("dags:runAndTaskActions.options.past"),
+ value: "past",
+ },
+ {
+ disabled: !hasLogicalDate,
+ label: translate("dags:runAndTaskActions.options.future"),
+ value: "future",
+ },
+ {
+ label:
translate("dags:runAndTaskActions.options.upstream"),
+ value: "upstream",
+ },
+ {
+ label:
translate("dags:runAndTaskActions.options.downstream"),
+ value: "downstream",
+ },
+ {
+ label:
translate("dags:runAndTaskActions.options.onlyFailed"),
+ value: "onlyFailed",
+ },
+ ]}
+ />
+ </Flex>
+ <ActionAccordion affectedTasks={affectedTasks} groupByRunId
note={note} setNote={setNote} />
+ <ErrorAlert error={error} />
+ <Flex alignItems="center" justifyContent="space-between" mt={3}>
+ <Checkbox
+ checked={preventRunningTask}
+ onCheckedChange={(event) =>
setPreventRunningTask(Boolean(event.checked))}
+ >
+
{translate("dags:runAndTaskActions.options.preventRunningTasks")}
+ </Checkbox>
+ <Button
+ colorPalette="brand"
+ disabled={affectedTasks.total_entries === 0}
+ loading={isPending || isFetching}
+ onClick={() => {
+ void bulkClear(selectedTaskInstances, {
+ includeDownstream: downstream,
+ includeFuture: future,
+ includeOnlyFailed: onlyFailed,
+ includePast: past,
+ includeUpstream: upstream,
+ note,
+ preventRunningTask,
+ });
+ }}
+ >
+ <CgRedo />
+ {translate("modal.confirm")}
+ </Button>
+ </Flex>
+ </Dialog.Body>
+ </Dialog.Content>
+ </Dialog.Root>
+ </>
+ );
+};
+
+export default BulkClearTaskInstancesButton;
diff --git
a/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkDeleteTaskInstancesButton.tsx
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkDeleteTaskInstancesButton.tsx
new file mode 100644
index 00000000000..c3395d3be03
--- /dev/null
+++
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkDeleteTaskInstancesButton.tsx
@@ -0,0 +1,153 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Box, Button, Flex, Heading, Text, useDisclosure, VStack } from
"@chakra-ui/react";
+import { useTranslation } from "react-i18next";
+import { FiTrash2 } from "react-icons/fi";
+
+import type { TaskInstanceResponse } from "openapi/requests/types.gen";
+import { getColumns } from "src/components/ActionAccordion/columns";
+import { DataTable } from "src/components/DataTable";
+import { ErrorAlert } from "src/components/ErrorAlert";
+import { Accordion, Dialog } from "src/components/ui";
+import { useBulkTaskInstances } from "src/queries/useBulkTaskInstances";
+
+type Props = {
+ readonly clearSelections: VoidFunction;
+ readonly selectedTaskInstances: Array<TaskInstanceResponse>;
+};
+
+const BulkDeleteTaskInstancesButton = ({ clearSelections,
selectedTaskInstances }: Props) => {
+ const { t: translate } = useTranslation();
+ const { onClose, onOpen, open } = useDisclosure();
+ const { bulkAction, error, isPending } = useBulkTaskInstances({
+ clearSelections,
+ onSuccessConfirm: onClose,
+ });
+
+ const columns = getColumns(translate);
+
+ // Group by dag_run_id for display
+ const byRunId = new Map<string, Array<TaskInstanceResponse>>();
+
+ for (const ti of selectedTaskInstances) {
+ const group = byRunId.get(ti.dag_run_id) ?? [];
+
+ group.push(ti);
+ byRunId.set(ti.dag_run_id, group);
+ }
+
+ const isGrouped = byRunId.size > 1;
+
+ return (
+ <>
+ <Button colorPalette="danger" onClick={onOpen} size="sm"
variant="outline">
+ <FiTrash2 />
+ {translate("dags:runAndTaskActions.delete.button", { type:
translate("taskInstance_other") })}
+ </Button>
+
+ <Dialog.Root onOpenChange={onClose} open={open} size="xl">
+ <Dialog.Content backdrop>
+ <Dialog.Header>
+ <VStack align="start" gap={4}>
+ <Heading size="xl">
+ {translate("dags:runAndTaskActions.delete.dialog.title", {
+ type: translate("taskInstance_other"),
+ })}
+ </Heading>
+ </VStack>
+ </Dialog.Header>
+
+ <Dialog.CloseTrigger />
+ <Dialog.Body width="full">
+ <Text color="fg.subtle" fontSize="sm" mb={4}>
+ {translate("dags:runAndTaskActions.delete.dialog.warning", {
+ type: translate("taskInstance_other"),
+ })}
+ </Text>
+
+ <Box maxH="400px" overflowY="auto">
+ {isGrouped ? (
+ <Accordion.Root collapsible multiple variant="enclosed">
+ {[...byRunId.entries()].map(([runId, tis]) => (
+ <Accordion.Item key={runId} value={runId}>
+ <Accordion.ItemTrigger>
+ <Text fontSize="sm" fontWeight="semibold">
+ {translate("runId")}: {runId}{" "}
+ <Text as="span" color="fg.subtle"
fontWeight="normal">
+ ({tis.length})
+ </Text>
+ </Text>
+ </Accordion.ItemTrigger>
+ <Accordion.ItemContent>
+ <DataTable
+ columns={columns}
+ data={tis}
+ displayMode="table"
+ modelName="common:taskInstance"
+ total={tis.length}
+ />
+ </Accordion.ItemContent>
+ </Accordion.Item>
+ ))}
+ </Accordion.Root>
+ ) : (
+ <DataTable
+ columns={columns}
+ data={selectedTaskInstances}
+ displayMode="table"
+ modelName="common:taskInstance"
+ total={selectedTaskInstances.length}
+ />
+ )}
+ </Box>
+
+ <ErrorAlert error={error} />
+ <Flex justifyContent="end" mt={3}>
+ <Button
+ colorPalette="danger"
+ loading={isPending}
+ onClick={() => {
+ bulkAction({
+ actions: [
+ {
+ action: "delete" as const,
+ action_on_non_existence: "skip",
+ entities: selectedTaskInstances.map((ti) => ({
+ dag_id: ti.dag_id,
+ dag_run_id: ti.dag_run_id,
+ map_index: ti.map_index,
+ task_id: ti.task_id,
+ })),
+ },
+ ],
+ });
+ }}
+ >
+ <FiTrash2 />
+ <Text fontWeight="bold">{translate("modal.confirm")}</Text>
+ </Button>
+ </Flex>
+ </Dialog.Body>
+ </Dialog.Content>
+ </Dialog.Root>
+ </>
+ );
+};
+
+export default BulkDeleteTaskInstancesButton;
diff --git
a/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkMarkTaskInstancesAsButton.tsx
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkMarkTaskInstancesAsButton.tsx
new file mode 100644
index 00000000000..e30b777ad9c
--- /dev/null
+++
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkMarkTaskInstancesAsButton.tsx
@@ -0,0 +1,208 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Badge, Box, Button, Flex, Heading, HStack, VStack, useDisclosure }
from "@chakra-ui/react";
+import { useState } from "react";
+import { useTranslation } from "react-i18next";
+import { FiX } from "react-icons/fi";
+import { LuCheck } from "react-icons/lu";
+
+import type { TaskInstanceResponse, TaskInstanceState } from
"openapi/requests/types.gen";
+import { ActionAccordion } from "src/components/ActionAccordion";
+import { ErrorAlert } from "src/components/ErrorAlert";
+import { allowedStates } from "src/components/MarkAs/utils";
+import { StateBadge } from "src/components/StateBadge";
+import { Dialog, Menu } from "src/components/ui";
+import SegmentedControl from "src/components/ui/SegmentedControl";
+import { useBulkMarkAsDryRun } from "src/queries/useBulkMarkAsDryRun";
+import { useBulkTaskInstances } from "src/queries/useBulkTaskInstances";
+
+type Props = {
+ readonly clearSelections: VoidFunction;
+ readonly selectedTaskInstances: Array<TaskInstanceResponse>;
+};
+
+const BulkMarkTaskInstancesAsButton = ({ clearSelections,
selectedTaskInstances }: Props) => {
+ const { t: translate } = useTranslation();
+ const { onClose, onOpen, open } = useDisclosure();
+ const [state, setState] = useState<TaskInstanceState>("success");
+ const [selectedOptions, setSelectedOptions] = useState<Array<string>>([]);
+ const [note, setNote] = useState<string | null>(null);
+ const { bulkAction, error, isPending, setError } = useBulkTaskInstances({
+ clearSelections,
+ onSuccessConfirm: onClose,
+ });
+
+ const past = selectedOptions.includes("past");
+ const future = selectedOptions.includes("future");
+ const upstream = selectedOptions.includes("upstream");
+ const downstream = selectedOptions.includes("downstream");
+
+ const hasLogicalDate = selectedTaskInstances.some((ti) => ti.logical_date
!== null);
+
+ const affectedCount = (targetState: TaskInstanceState) =>
+ selectedTaskInstances.filter((ti) => ti.state !== targetState).length;
+
+ const { data: affectedTasks, isFetching } = useBulkMarkAsDryRun(open, {
+ options: {
+ includeDownstream: downstream,
+ includeFuture: future,
+ includePast: past,
+ includeUpstream: upstream,
+ },
+ selectedTaskInstances,
+ targetState: state,
+ });
+
+ const handleOpen = (newState: TaskInstanceState) => {
+ setState(newState);
+ setSelectedOptions([]);
+ setNote(null);
+ setError(undefined);
+ onOpen();
+ };
+
+ const directlyAffected = selectedTaskInstances.filter((ti) => ti.state !==
state);
+
+ return (
+ <Box>
+ <Menu.Root positioning={{ gutter: 0, placement: "top" }}>
+ <Menu.Trigger asChild>
+ <div>
+ <Button colorPalette="brand" size="sm" variant="outline">
+ <HStack gap={1} mx={1}>
+ <LuCheck />
+ <span>/</span>
+ <FiX />
+ </HStack>
+ {translate("dags:runAndTaskActions.markAs.button", { type:
translate("taskInstance_other") })}
+ </Button>
+ </div>
+ </Menu.Trigger>
+ <Menu.Content>
+ {allowedStates.map((menuState) => {
+ const count = affectedCount(menuState);
+
+ return (
+ <Menu.Item
+ disabled={count === 0}
+ key={menuState}
+ onClick={() => {
+ if (count > 0) {
+ handleOpen(menuState);
+ }
+ }}
+ value={menuState}
+ >
+ <HStack justify="space-between" width="full">
+ <StateBadge
state={menuState}>{translate(`common:states.${menuState}`)}</StateBadge>
+ <Badge colorPalette="gray" variant="subtle">
+ {count}
+ </Badge>
+ </HStack>
+ </Menu.Item>
+ );
+ })}
+ </Menu.Content>
+ </Menu.Root>
+
+ <Dialog.Root onOpenChange={onClose} open={open} size="xl">
+ <Dialog.Content backdrop>
+ <Dialog.Header>
+ <VStack align="start" gap={4}>
+ <Heading size="xl">
+ {translate("dags:runAndTaskActions.markAs.title", {
+ state,
+ type: translate("taskInstance_other"),
+ })}{" "}
+ <StateBadge state={state} />
+ </Heading>
+ </VStack>
+ </Dialog.Header>
+
+ <Dialog.CloseTrigger />
+ <Dialog.Body width="full">
+ <Flex justifyContent="center" mb={4}>
+ <SegmentedControl
+ defaultValues={[]}
+ multiple
+ onChange={setSelectedOptions}
+ options={[
+ {
+ disabled: !hasLogicalDate,
+ label: translate("dags:runAndTaskActions.options.past"),
+ value: "past",
+ },
+ {
+ disabled: !hasLogicalDate,
+ label: translate("dags:runAndTaskActions.options.future"),
+ value: "future",
+ },
+ {
+ label:
translate("dags:runAndTaskActions.options.upstream"),
+ value: "upstream",
+ },
+ {
+ label:
translate("dags:runAndTaskActions.options.downstream"),
+ value: "downstream",
+ },
+ ]}
+ />
+ </Flex>
+ <ActionAccordion affectedTasks={affectedTasks} groupByRunId
note={note} setNote={setNote} />
+ <ErrorAlert error={error} />
+ <Flex justifyContent="end" mt={3}>
+ <Button
+ colorPalette="brand"
+ disabled={affectedTasks.total_entries === 0}
+ loading={isPending || isFetching}
+ onClick={() => {
+ bulkAction({
+ actions: [
+ {
+ action: "update" as const,
+ action_on_non_existence: "skip",
+ entities: directlyAffected.map((ti) => ({
+ dag_id: ti.dag_id,
+ dag_run_id: ti.dag_run_id,
+ include_downstream: downstream,
+ include_future: future,
+ include_past: past,
+ include_upstream: upstream,
+ map_index: ti.map_index,
+ new_state: state,
+ note,
+ task_id: ti.task_id,
+ })),
+ update_mask: note === null ? ["new_state"] :
["new_state", "note"],
+ },
+ ],
+ });
+ }}
+ >
+ {translate("modal.confirm")}
+ </Button>
+ </Flex>
+ </Dialog.Body>
+ </Dialog.Content>
+ </Dialog.Root>
+ </Box>
+ );
+};
+
+export default BulkMarkTaskInstancesAsButton;
diff --git
a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx
index 678f386f116..e066436afd0 100644
--- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx
@@ -29,21 +29,29 @@ import type { TaskInstanceResponse } from
"openapi/requests/types.gen";
import { ClearTaskInstanceButton } from "src/components/Clear";
import { DagVersion } from "src/components/DagVersion";
import { DataTable } from "src/components/DataTable";
+import { useRowSelection, type GetColumnsParams } from
"src/components/DataTable/useRowSelection";
import { useTableURLState } from "src/components/DataTable/useTableUrlState";
import { ErrorAlert } from "src/components/ErrorAlert";
import { MarkTaskInstanceAsButton } from "src/components/MarkAs";
import { StateBadge } from "src/components/StateBadge";
import Time from "src/components/Time";
import { TruncatedText } from "src/components/TruncatedText";
+import { ActionBar } from "src/components/ui/ActionBar";
+import { Checkbox } from "src/components/ui/Checkbox";
import { SearchParamsKeys, type SearchParamsKeysType } from
"src/constants/searchParams";
import { useAutoRefresh, isStatePending, renderDuration } from "src/utils";
import { getTaskInstanceLink } from "src/utils/links";
+import BulkClearTaskInstancesButton from "./BulkClearTaskInstancesButton";
+import BulkDeleteTaskInstancesButton from "./BulkDeleteTaskInstancesButton";
+import BulkMarkTaskInstancesAsButton from "./BulkMarkTaskInstancesAsButton";
import DeleteTaskInstanceButton from "./DeleteTaskInstanceButton";
import { TaskInstancesFilter } from "./TaskInstancesFilter";
type TaskInstanceRow = { row: { original: TaskInstanceResponse } };
+const getRowKey = (ti: TaskInstanceResponse) =>
`${ti.dag_id}:${ti.dag_run_id}:${ti.task_id}:${ti.map_index}`;
+
const {
DAG_ID_PATTERN: DAG_ID_PATTERN_PARAM,
DAG_VERSION: DAG_VERSION_PARAM,
@@ -63,17 +71,47 @@ const {
TRY_NUMBER: TRY_NUMBER_PARAM,
}: SearchParamsKeysType = SearchParamsKeys;
+type ColumnProps = {
+ readonly dagId?: string;
+ readonly runId?: string;
+ readonly taskId?: string;
+ readonly translate: TFunction;
+};
+
const taskInstanceColumns = ({
+ allRowsSelected,
dagId,
+ onRowSelect,
+ onSelectAll,
runId,
+ selectedRows,
taskId,
translate,
-}: {
- dagId?: string;
- runId?: string;
- taskId?: string;
- translate: TFunction;
-}): Array<ColumnDef<TaskInstanceResponse>> => [
+}: ColumnProps & GetColumnsParams): Array<ColumnDef<TaskInstanceResponse>> => [
+ {
+ accessorKey: "select",
+ cell: ({ row }) => (
+ <Checkbox
+ borderWidth={1}
+ checked={selectedRows.get(getRowKey(row.original))}
+ colorPalette="brand"
+ onCheckedChange={(event) => onRowSelect(getRowKey(row.original),
Boolean(event.checked))}
+ />
+ ),
+ enableHiding: false,
+ enableSorting: false,
+ header: () => (
+ <Checkbox
+ borderWidth={1}
+ checked={allRowsSelected}
+ colorPalette="brand"
+ onCheckedChange={(event) => onSelectAll(Boolean(event.checked))}
+ />
+ ),
+ meta: {
+ skeletonWidth: 10,
+ },
+ },
...(Boolean(dagId)
? []
: [
@@ -95,7 +133,6 @@ const taskInstanceColumns = ({
: [
{
accessorKey: "run_after",
- // If we don't show the taskId column, make the dag run a link to
the task instance
cell: ({ row: { original } }: TaskInstanceRow) =>
Boolean(taskId) ? (
<Link asChild color="fg.info" fontWeight="bold">
@@ -292,9 +329,22 @@ export const TaskInstances = () => {
},
);
+ const { allRowsSelected, clearSelections, handleRowSelect, handleSelectAll,
selectedRows } =
+ useRowSelection({
+ data: data?.task_instances,
+ getKey: getRowKey,
+ });
+
+ const selectedTaskInstances = (data?.task_instances ?? []).filter((ti) =>
selectedRows.has(getRowKey(ti)));
+
const columns = taskInstanceColumns({
+ allRowsSelected,
dagId,
+ multiTeam: false,
+ onRowSelect: handleRowSelect,
+ onSelectAll: handleSelectAll,
runId,
+ selectedRows,
taskId: Boolean(groupId) ? undefined : taskId,
translate,
});
@@ -312,6 +362,27 @@ export const TaskInstances = () => {
onStateChange={setTableURLState}
total={data?.total_entries}
/>
+ <ActionBar.Root closeOnInteractOutside={false}
open={Boolean(selectedRows.size)}>
+ <ActionBar.Content>
+ <ActionBar.SelectionTrigger>
+ {selectedRows.size} {translate("selected")}
+ </ActionBar.SelectionTrigger>
+ <ActionBar.Separator />
+ <BulkClearTaskInstancesButton
+ clearSelections={clearSelections}
+ selectedTaskInstances={selectedTaskInstances}
+ />
+ <BulkMarkTaskInstancesAsButton
+ clearSelections={clearSelections}
+ selectedTaskInstances={selectedTaskInstances}
+ />
+ <BulkDeleteTaskInstancesButton
+ clearSelections={clearSelections}
+ selectedTaskInstances={selectedTaskInstances}
+ />
+ <ActionBar.CloseTrigger onClick={clearSelections} />
+ </ActionBar.Content>
+ </ActionBar.Root>
</>
);
};
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkClearDryRun.ts
b/airflow-core/src/airflow/ui/src/queries/useBulkClearDryRun.ts
new file mode 100644
index 00000000000..e631860f300
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearDryRun.ts
@@ -0,0 +1,113 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueries } from "@tanstack/react-query";
+import { useMemo } from "react";
+
+import { TaskInstanceService } from "openapi/requests/services.gen";
+import type { TaskInstanceCollectionResponse, TaskInstanceResponse } from
"openapi/requests/types.gen";
+
+type Options = {
+ includeDownstream: boolean;
+ includeFuture: boolean;
+ includeOnlyFailed: boolean;
+ includePast: boolean;
+ includeUpstream: boolean;
+};
+
+const EMPTY: TaskInstanceCollectionResponse = { task_instances: [],
total_entries: 0 };
+
+export const useBulkClearDryRunKey = "bulkClearDryRun";
+
+export const useBulkClearDryRun = (
+ enabled: boolean,
+ selectedTaskInstances: Array<TaskInstanceResponse>,
+ options: Options,
+) => {
+ const byDagRun = useMemo(() => {
+ const groups = new Map<string, { dagId: string; dagRunId: string; tis:
Array<TaskInstanceResponse> }>();
+
+ for (const ti of selectedTaskInstances) {
+ const key = `${ti.dag_id}::${ti.dag_run_id}`;
+ const group = groups.get(key) ?? { dagId: ti.dag_id, dagRunId:
ti.dag_run_id, tis: [] };
+
+ group.tis.push(ti);
+ groups.set(key, group);
+ }
+
+ return [...groups.values()];
+ }, [selectedTaskInstances]);
+
+ const results = useQueries({
+ queries: byDagRun.map(({ dagId, dagRunId, tis }) => ({
+ enabled,
+ queryFn: () =>
+ TaskInstanceService.postClearTaskInstances({
+ dagId,
+ requestBody: {
+ dag_run_id: dagRunId,
+ dry_run: true,
+ include_downstream: options.includeDownstream,
+ include_future: options.includeFuture,
+ include_past: options.includePast,
+ include_upstream: options.includeUpstream,
+ only_failed: options.includeOnlyFailed,
+ task_ids: tis.map((ti) =>
+ ti.map_index >= 0 ? ([ti.task_id, ti.map_index] as [string,
number]) : ti.task_id,
+ ),
+ },
+ }),
+ queryKey: [
+ useBulkClearDryRunKey,
+ dagId,
+ dagRunId,
+ {
+ include_downstream: options.includeDownstream,
+ include_future: options.includeFuture,
+ include_only_failed: options.includeOnlyFailed,
+ include_past: options.includePast,
+ include_upstream: options.includeUpstream,
+ task_ids: tis.map((ti) => `${ti.task_id}:${ti.map_index}`),
+ },
+ ],
+ refetchOnMount: "always" as const,
+ })),
+ });
+
+ const isFetching = results.some((result) => result.isFetching);
+
+ const data = useMemo<TaskInstanceCollectionResponse>(() => {
+ const seen = new Set<string>();
+ const merged: Array<TaskInstanceResponse> = [];
+
+ for (const result of results) {
+ for (const ti of result.data?.task_instances ?? []) {
+ const key =
`${ti.dag_id}:${ti.dag_run_id}:${ti.task_id}:${ti.map_index}`;
+
+ if (!seen.has(key)) {
+ seen.add(key);
+ merged.push(ti);
+ }
+ }
+ }
+
+ return merged.length === 0 ? EMPTY : { task_instances: merged,
total_entries: merged.length };
+ }, [results]);
+
+ return { data, isFetching };
+};
diff --git
a/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts
b/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts
new file mode 100644
index 00000000000..063b1c6d77d
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts
@@ -0,0 +1,118 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueryClient } from "@tanstack/react-query";
+import { useState } from "react";
+import { useTranslation } from "react-i18next";
+
+import { useDagRunServiceGetDagRunsKey,
useTaskInstanceServiceGetTaskInstancesKey } from "openapi/queries";
+import { TaskInstanceService } from "openapi/requests/services.gen";
+import type { TaskInstanceResponse } from "openapi/requests/types.gen";
+import { toaster } from "src/components/ui";
+
+type Props = {
+ readonly clearSelections: VoidFunction;
+ readonly onSuccessConfirm: VoidFunction;
+};
+
+export type BulkClearOptions = {
+ includeDownstream: boolean;
+ includeFuture: boolean;
+ includeOnlyFailed: boolean;
+ includePast: boolean;
+ includeUpstream: boolean;
+ note: string | null;
+ preventRunningTask: boolean;
+};
+
+export const useBulkClearTaskInstances = ({ clearSelections, onSuccessConfirm
}: Props) => {
+ const queryClient = useQueryClient();
+ const [error, setError] = useState<unknown>(undefined);
+ const [isPending, setIsPending] = useState(false);
+ const { t: translate } = useTranslation(["common", "dags"]);
+
+ const invalidateQueries = async () => {
+ await Promise.all([
+ queryClient.invalidateQueries({ queryKey:
[useTaskInstanceServiceGetTaskInstancesKey] }),
+ queryClient.invalidateQueries({ queryKey:
[useDagRunServiceGetDagRunsKey] }),
+ ]);
+ };
+
+ const bulkClear = async (taskInstances: Array<TaskInstanceResponse>,
options: BulkClearOptions) => {
+ setError(undefined);
+ setIsPending(true);
+
+ // Group by (dag_id, dag_run_id) — clear endpoint requires a specific
dag_id
+ // and dag_run_id scopes the clear to the specific run
+ const byDagRun = new Map<string, { dagId: string; dagRunId: string; tis:
Array<TaskInstanceResponse> }>();
+
+ for (const ti of taskInstances) {
+ const key = `${ti.dag_id}::${ti.dag_run_id}`;
+ const group = byDagRun.get(key) ?? { dagId: ti.dag_id, dagRunId:
ti.dag_run_id, tis: [] };
+
+ group.tis.push(ti);
+ byDagRun.set(key, group);
+ }
+
+ try {
+ await Promise.all(
+ [...byDagRun.values()].map(({ dagId, dagRunId, tis }) =>
+ TaskInstanceService.postClearTaskInstances({
+ dagId,
+ requestBody: {
+ dag_run_id: dagRunId,
+ dry_run: false,
+ include_downstream: options.includeDownstream,
+ include_future: options.includeFuture,
+ include_past: options.includePast,
+ include_upstream: options.includeUpstream,
+ note: options.note,
+ only_failed: options.includeOnlyFailed,
+ ...(options.preventRunningTask ? { prevent_running_task: true }
: {}),
+ task_ids: tis.map((ti) =>
+ ti.map_index >= 0 ? ([ti.task_id, ti.map_index] as [string,
number]) : ti.task_id,
+ ),
+ },
+ }),
+ ),
+ );
+
+ await invalidateQueries();
+
+ toaster.create({
+ description: translate("toaster.bulkClear.success.description", {
+ count: taskInstances.length,
+ keys: taskInstances.map((ti) => ti.task_id).join(", "),
+ resourceName: translate("taskInstance_other"),
+ }),
+ title: translate("toaster.bulkClear.success.title", {
+ resourceName: translate("taskInstance_other"),
+ }),
+ type: "success",
+ });
+
+ clearSelections();
+ onSuccessConfirm();
+ } catch (_error) {
+ setError(_error);
+ }
+ setIsPending(false);
+ };
+
+ return { bulkClear, error, isPending, setError };
+};
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts
b/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts
new file mode 100644
index 00000000000..b8d5eb06758
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts
@@ -0,0 +1,113 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueries } from "@tanstack/react-query";
+import { useMemo } from "react";
+
+import { TaskInstanceService } from "openapi/requests/services.gen";
+import type {
+ TaskInstanceCollectionResponse,
+ TaskInstanceResponse,
+ TaskInstanceState,
+} from "openapi/requests/types.gen";
+
+type Options = {
+ includeDownstream: boolean;
+ includeFuture: boolean;
+ includePast: boolean;
+ includeUpstream: boolean;
+};
+
+const EMPTY: TaskInstanceCollectionResponse = { task_instances: [],
total_entries: 0 };
+
+export const useBulkMarkAsDryRunKey = "bulkMarkAsDryRun";
+
+export const useBulkMarkAsDryRun = (
+ enabled: boolean,
+ {
+ options,
+ selectedTaskInstances,
+ targetState,
+ }: {
+ options: Options;
+ selectedTaskInstances: Array<TaskInstanceResponse>;
+ targetState: TaskInstanceState;
+ },
+) => {
+ const affectedInstances = useMemo(
+ () => selectedTaskInstances.filter((ti) => ti.state !== targetState),
+ [selectedTaskInstances, targetState],
+ );
+
+ const results = useQueries({
+ queries: affectedInstances.map((ti) => ({
+ enabled,
+ queryFn: () =>
+ TaskInstanceService.patchTaskInstanceDryRun({
+ dagId: ti.dag_id,
+ dagRunId: ti.dag_run_id,
+ mapIndex: ti.map_index,
+ requestBody: {
+ include_downstream: options.includeDownstream,
+ include_future: options.includeFuture,
+ include_past: options.includePast,
+ include_upstream: options.includeUpstream,
+ new_state: targetState,
+ },
+ taskId: ti.task_id,
+ }),
+ queryKey: [
+ useBulkMarkAsDryRunKey,
+ ti.dag_id,
+ ti.dag_run_id,
+ ti.task_id,
+ ti.map_index,
+ {
+ include_downstream: options.includeDownstream,
+ include_future: options.includeFuture,
+ include_past: options.includePast,
+ include_upstream: options.includeUpstream,
+ new_state: targetState,
+ },
+ ],
+ refetchOnMount: "always" as const,
+ })),
+ });
+
+ const isFetching = results.some((result) => result.isFetching);
+
+ const data = useMemo<TaskInstanceCollectionResponse>(() => {
+ const seen = new Set<string>();
+ const merged: Array<TaskInstanceResponse> = [];
+
+ for (const result of results) {
+ for (const ti of result.data?.task_instances ?? []) {
+ const key =
`${ti.dag_id}:${ti.dag_run_id}:${ti.task_id}:${ti.map_index}`;
+
+ if (!seen.has(key)) {
+ seen.add(key);
+ merged.push(ti);
+ }
+ }
+ }
+
+ return merged.length === 0 ? EMPTY : { task_instances: merged,
total_entries: merged.length };
+ }, [results]);
+
+ return { data, isFetching };
+};
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts
b/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts
new file mode 100644
index 00000000000..8ad4d616197
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts
@@ -0,0 +1,105 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueryClient } from "@tanstack/react-query";
+import { useState } from "react";
+import { useTranslation } from "react-i18next";
+
+import {
+ useDagRunServiceGetDagRunsKey,
+ useTaskInstanceServiceBulkTaskInstances,
+ useTaskInstanceServiceGetTaskInstancesKey,
+} from "openapi/queries";
+import type {
+ BulkActionResponse,
+ BulkBody_BulkTaskInstanceBody_,
+ BulkResponse,
+} from "openapi/requests/types.gen";
+import { toaster } from "src/components/ui";
+
+type Props = {
+ readonly clearSelections: VoidFunction;
+ readonly onSuccessConfirm: VoidFunction;
+};
+
+const handleActionResult = (
+ actionResult: BulkActionResponse,
+ setError: (error: unknown) => void,
+ onSuccess: (count: number, keys: Array<string>) => void,
+) => {
+ const { errors, success } = actionResult;
+
+ if (Array.isArray(errors) && errors.length > 0) {
+ const apiError = errors[0] as { error: string };
+
+ setError({ body: { detail: apiError.error } });
+ } else if (Array.isArray(success) && success.length > 0) {
+ onSuccess(success.length, success);
+ }
+};
+
+export const useBulkTaskInstances = ({ clearSelections, onSuccessConfirm }:
Props) => {
+ const queryClient = useQueryClient();
+ const [error, setError] = useState<unknown>(undefined);
+ const { t: translate } = useTranslation(["common", "dags"]);
+
+ const onSuccess = async (responseData: BulkResponse) => {
+ await Promise.all([
+ queryClient.invalidateQueries({ queryKey:
[useTaskInstanceServiceGetTaskInstancesKey] }),
+ queryClient.invalidateQueries({ queryKey:
[useDagRunServiceGetDagRunsKey] }),
+ ]);
+
+ const isDelete = Boolean(responseData.delete);
+ const actionResult = responseData.delete ?? responseData.update;
+ const toasterKey = isDelete ? "toaster.bulkDelete" : "toaster.bulkUpdate";
+
+ if (actionResult) {
+ handleActionResult(actionResult, setError, (count, keys) => {
+ toaster.create({
+ description: translate(`${toasterKey}.success.description`, {
+ count,
+ keys: keys.join(", "),
+ resourceName: translate("taskInstance_other"),
+ }),
+ title: translate(`${toasterKey}.success.title`, {
+ resourceName: translate("taskInstance_other"),
+ }),
+ type: "success",
+ });
+ clearSelections();
+ onSuccessConfirm();
+ });
+ }
+ };
+
+ const onError = (_error: unknown) => {
+ setError(_error);
+ };
+
+ const { isPending, mutate } = useTaskInstanceServiceBulkTaskInstances({
+ onError,
+ onSuccess,
+ });
+
+ const bulkAction = (requestBody: BulkBody_BulkTaskInstanceBody_) => {
+ setError(undefined);
+ mutate({ dagId: "~", dagRunId: "~", requestBody });
+ };
+
+ return { bulkAction, error, isPending, setError };
+};
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index ba8f06a7424..1a5d689d1ef 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -3644,6 +3644,88 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
assert response.status_code == 200
assert logs == audit_log_count
+ @pytest.mark.db_test
+ def test_clear_sets_note_on_task_instances(self, test_client, session):
+ """Test that a note is set on cleared task instances when note is
provided."""
+ dag_id = "example_python_operator"
+ note_value = "Cleared by automation"
+ payload = {
+ "dry_run": False,
+ "reset_dag_runs": False,
+ "only_failed": True,
+ "only_running": False,
+ "note": note_value,
+ }
+ self.create_task_instances(
+ session,
+ dag_id=dag_id,
+ task_instances=[{"logical_date": DEFAULT_DATETIME_1, "state":
State.FAILED}],
+ update_extras=False,
+ )
+ response = test_client.post(
+ f"/dags/{dag_id}/clearTaskInstances",
+ json=payload,
+ )
+ assert response.status_code == 200
+ response_data = response.json()
+ assert response_data["total_entries"] == 1
+ ti_id = response_data["task_instances"][0]["id"]
+ _check_task_instance_note(session, ti_id, {"content": note_value,
"user_id": "test"})
+
+ @pytest.mark.db_test
+ def test_clear_without_note_does_not_set_note(self, test_client, session):
+ """Test that existing note is preserved on cleared task instances when
note is not provided."""
+ dag_id = "example_python_operator"
+ payload = {
+ "dry_run": False,
+ "reset_dag_runs": False,
+ "only_failed": True,
+ "only_running": False,
+ }
+ self.create_task_instances(
+ session,
+ dag_id=dag_id,
+ task_instances=[{"logical_date": DEFAULT_DATETIME_1, "state":
State.FAILED}],
+ update_extras=False,
+ )
+ response = test_client.post(
+ f"/dags/{dag_id}/clearTaskInstances",
+ json=payload,
+ )
+ assert response.status_code == 200
+ response_data = response.json()
+ assert response_data["total_entries"] == 1
+ ti_id = response_data["task_instances"][0]["id"]
+ _check_task_instance_note(session, ti_id, {"content":
"placeholder-note", "user_id": None})
+
+ @pytest.mark.db_test
+ def test_clear_dry_run_does_not_set_note(self, test_client, session):
+ """Test that a note is NOT updated when dry_run=True even if note is
provided."""
+ dag_id = "example_python_operator"
+ note_value = "Should not be set"
+ payload = {
+ "dry_run": True,
+ "reset_dag_runs": False,
+ "only_failed": True,
+ "only_running": False,
+ "note": note_value,
+ }
+ self.create_task_instances(
+ session,
+ dag_id=dag_id,
+ task_instances=[{"logical_date": DEFAULT_DATETIME_1, "state":
State.FAILED}],
+ update_extras=False,
+ )
+ response = test_client.post(
+ f"/dags/{dag_id}/clearTaskInstances",
+ json=payload,
+ )
+ assert response.status_code == 200
+ response_data = response.json()
+ assert response_data["total_entries"] == 1
+ ti_id = response_data["task_instances"][0]["id"]
+ _check_task_instance_note(session, ti_id, {"content":
"placeholder-note", "user_id": None})
+
class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
def test_should_respond_200(self, test_client, session):
@@ -6001,6 +6083,64 @@ class TestBulkTaskInstances(TestTaskInstanceEndpoint):
for task_id, value in expected_results.items():
assert sorted(response_data[task_id]) == sorted(value)
+ @pytest.mark.parametrize(
+ ("map_index", "new_state"),
+ [
+ pytest.param(0, "failed", id="mapped-ti-map-index-0-failed"),
+ pytest.param(1, "failed", id="mapped-ti-map-index-1-failed"),
+ pytest.param(2, "success", id="mapped-ti-map-index-2-success"),
+ ],
+ )
+ def test_bulk_update_mapped_task_instance_state_is_persisted(
+ self, test_client, session, map_index, new_state
+ ):
+ """Verify that bulk-updating a specific mapped TI actually persists
the new state in the DB."""
+ self.create_task_instances(
+ session,
+ task_instances=[{"state": State.RUNNING, "map_indexes": (0, 1,
2)}],
+ )
+
+ response = test_client.patch(
+ self.ENDPOINT_URL,
+ json={
+ "actions": [
+ {
+ "action": "update",
+ "entities": [
+ {
+ "task_id": self.TASK_ID,
+ "map_index": map_index,
+ "new_state": new_state,
+ }
+ ],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ assert response.json()["update"]["success"] == [
+ f"{self.DAG_ID}.{self.RUN_ID}.{self.TASK_ID}[{map_index}]"
+ ]
+
+ session.expire_all()
+ # Verify only the targeted mapped TI changed state; others remain
unchanged.
+ for mi in [0, 1, 2]:
+ ti = session.scalar(
+ select(TaskInstance).where(
+ TaskInstance.dag_id == self.DAG_ID,
+ TaskInstance.run_id == self.RUN_ID,
+ TaskInstance.task_id == self.TASK_ID,
+ TaskInstance.map_index == mi,
+ )
+ )
+ assert ti is not None
+ if mi == map_index:
+ assert ti.state == new_state, f"Expected map_index={mi} to be
{new_state!r}, got {ti.state!r}"
+ else:
+ assert ti.state == State.RUNNING, (
+ f"Expected map_index={mi} to remain running, got
{ti.state!r}"
+ )
+
def test_should_respond_401(self, unauthenticated_test_client):
response = unauthenticated_test_client.patch(self.ENDPOINT_URL,
json={})
assert response.status_code == 401
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index bb375f88834..1fe38713703 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -173,6 +173,7 @@ class ClearTaskInstancesBody(BaseModel):
),
] = False
prevent_running_task: Annotated[bool | None, Field(title="Prevent Running
Task")] = False
+ note: Annotated[Note | None, Field(title="Note")] = None
class Value(RootModel[list]):