This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 554c1856945 Checkbox before clear task confirmation to prevent rerun
of tasks in 'Running' states. (#56351)
554c1856945 is described below
commit 554c18569455417a8d389edb571030a2ccd104d7
Author: Klarence Nicolas Catalan
<[email protected]>
AuthorDate: Fri Nov 21 04:25:27 2025 +0800
Checkbox before clear task confirmation to prevent rerun of tasks in
'Running' states. (#56351)
* Added a confirm dialog modal that appears only when the task is in
running, up_for_retry, or restarting state.
* Removed unnecessary comments
* Added clearTask texts in dags.json in the EN locale. Used translate in
confirmation dialogue.
* (NON-WORKING/ Experimental) added a flag in taskinstance.py. backend and
ui still not communicating. MIGHT revert to previous commit.
* (EXPERIMENTAL) Removed the confirm pop-up and replaced with checkbox that
will not run the instance until the instance is not running.
* Added mirgin-right auto for the checkbox
* Change isRunning to task_confirmed_running. Moved the HTTPException to
router. Followed suggestion about attribute access.
* Made the checkbox checked on default. is_running_message set to true by
default.
* Added the code 400 to line 688.
* Cleaned up the code and attempted to adress the error in the tests, e712.
* Made sure the date variable is not null and changed the string in the
value error
* Changed the hardcoded english into an i18n translate. Added
preventRunningTasks in dags.json in the en locale.
* Renamed the is_running_message to prevent_running_tasks. Removed
prevent_running_tasks' addition to ti, instead, it is passed as a parameter in
clearTaskInstance.
* Added a custom airflow exception for clearing running task instance.
Error code 400 is still being raised in the router.
* Added retry 0 in use clearTaskInstances.
* Added a conditional for error toast that states the task is running if
trying to clear a running task.
* added check for null in error.detail
* removed unnecessary imports in task_instances.py
* made the prevent_running_task optional to pass checks
* Changed prevent_running_task to Optional[bool]=None to be optional in
test check
* Attempt to resolve static checks which are unsafe assignment and calls in
useClearTaskInstances, dateTimeUtils, and dateTimeUtils.test.
* Added flag for QUEUED and SCHEDULED in taskinstances.py
* Added a modal for QUEUED and SCHEDULED states.
* Added the text of the modal in the en locale.
* Changed the contents of the modal to include the user that started the
task and the time when it was started.
* Changed taskinstance.py according to the comments, and changed the modal
behavior to only trigger the toast when task is running, not in queued or
scheduled.
* Attempt to resolve static checks
* Attempt to resolve assertion errors of prevent_running_task = false in
tests.
* Attempt to fix the restarting state returned by clear_task_instances
method.
* 2nd Attempt to resolve static checks
* Used common:error.defaultMessage for the default error toast. Added
useState to avoid the modal rendering for a split second when its not supposed
to.
* Resolve the dateTimeUtils.getRelativeTime not returning an empty string
when date is undefined. 3rd Attempt to solve static checks.
* Attempt to resolve 7 eslint static checks error. Shortened conditional
codes.
* Resolving eslint errors in UseClearTaskInstance and
ClearTaskInstanceConfirmationDialog. Implemented suggested changes.
* Attempt to resolve detail in useClearTaskInstances not triggering
typescript-eslint/no-unsafe-assignment
* Attempt to resolve static checks.
* Removed kwargs inspect, added back prevent_running_task, and edited test
to count prevent_running_task.
* Changed error code 400 to 409. Added the pre-commit modification.
* Fixed missing detail in the error toast.
---
.../core_api/datamodels/task_instances.py | 1 +
.../core_api/openapi/v2-rest-api-generated.yaml | 10 ++
.../core_api/routes/public/task_instances.py | 20 +--
airflow-core/src/airflow/exceptions.py | 4 +
airflow-core/src/airflow/models/taskinstance.py | 14 +-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 5 +
.../ui/openapi-gen/requests/services.gen.ts | 1 +
.../airflow/ui/openapi-gen/requests/types.gen.ts | 5 +
.../airflow/ui/public/i18n/locales/en/dags.json | 7 +-
.../ClearTaskInstanceConfirmationDialog.tsx | 151 +++++++++++++++++++++
.../Clear/TaskInstance/ClearTaskInstanceDialog.tsx | 92 +++++++++----
.../ui/src/queries/useClearTaskInstances.ts | 41 +++++-
.../src/airflow/ui/src/utils/datetimeUtils.test.ts | 33 ++++-
.../src/airflow/ui/src/utils/datetimeUtils.ts | 10 ++
.../core_api/routes/public/test_task_instances.py | 4 +-
.../src/airflowctl/api/datamodels/generated.py | 1 +
16 files changed, 350 insertions(+), 49 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
index b67ac7ea9c7..0a2570a0734 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -166,6 +166,7 @@ class ClearTaskInstancesBody(StrictBaseModel):
description="(Experimental) Run on the latest bundle version of the
dag after "
"clearing the task instances.",
)
+ prevent_running_task: bool = False
@model_validator(mode="before")
@classmethod
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 3faec5af18e..2e064fa35a6 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -7083,6 +7083,12 @@ paths:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
+ '409':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Conflict
'422':
description: Validation Error
content:
@@ -9610,6 +9616,10 @@ components:
description: (Experimental) Run on the latest bundle version of the
dag
after clearing the task instances.
default: false
+ prevent_running_task:
+ type: boolean
+ title: Prevent Running Task
+ default: false
additionalProperties: false
type: object
title: ClearTaskInstancesBody
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 21d88da4874..e0e730399ad 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -86,7 +86,7 @@ from
airflow.api_fastapi.core_api.services.public.task_instances import (
_patch_ti_validate_request,
)
from airflow.api_fastapi.logging.decorators import action_logging
-from airflow.exceptions import TaskNotFound
+from airflow.exceptions import AirflowClearRunningTaskException, TaskNotFound
from airflow.models import Base, DagRun
from airflow.models.taskinstance import TaskInstance as TI,
clear_task_instances
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
@@ -711,7 +711,7 @@ def get_mapped_task_instance_try_details(
@task_instances_router.post(
"/clearTaskInstances",
- responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT]),
dependencies=[
Depends(action_logging()),
Depends(requires_access_dag(method="PUT",
access_entity=DagAccessEntity.TASK_INSTANCE)),
@@ -805,12 +805,16 @@ def post_clear_task_instances(
)
if not dry_run:
- clear_task_instances(
- task_instances,
- session,
- DagRunState.QUEUED if reset_dag_runs else False,
- run_on_latest_version=body.run_on_latest_version,
- )
+ try:
+ clear_task_instances(
+ task_instances,
+ session,
+ DagRunState.QUEUED if reset_dag_runs else False,
+ run_on_latest_version=body.run_on_latest_version,
+ prevent_running_task=body.prevent_running_task,
+ )
+ except AirflowClearRunningTaskException as e:
+ raise HTTPException(status.HTTP_409_CONFLICT, str(e)) from e
return TaskInstanceCollectionResponse(
task_instances=[TaskInstanceResponse.model_validate(ti) for ti in
task_instances],
diff --git a/airflow-core/src/airflow/exceptions.py
b/airflow-core/src/airflow/exceptions.py
index 4a5ee45f26a..17d85e86050 100644
--- a/airflow-core/src/airflow/exceptions.py
+++ b/airflow-core/src/airflow/exceptions.py
@@ -534,3 +534,7 @@ def __getattr__(name: str):
return AirflowDagCycleException
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
+
+
+class AirflowClearRunningTaskException(AirflowException):
+ """Raise when the user attempts to clear currently running tasks."""
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index d5976896166..a3a96824619 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -194,6 +194,7 @@ def clear_task_instances(
session: Session,
dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED,
run_on_latest_version: bool = False,
+ prevent_running_task: bool | None = None,
) -> None:
"""
Clear a set of task instances, but make sure the running ones get killed.
@@ -213,16 +214,25 @@ def clear_task_instances(
:meta private:
"""
task_instance_ids: list[str] = []
+ from airflow.exceptions import AirflowClearRunningTaskException
from airflow.models.dagbag import DBDagBag
scheduler_dagbag = DBDagBag(load_op_links=False)
for ti in tis:
task_instance_ids.append(ti.id)
ti.prepare_db_for_next_try(session)
+
if ti.state == TaskInstanceState.RUNNING:
- # If a task is cleared when running, set its state to RESTARTING
so that
- # the task is terminated and becomes eligible for retry.
+ if prevent_running_task:
+ raise AirflowClearRunningTaskException(
+ "AirflowClearRunningTaskException: Disable
'prevent_running_task' to proceed, or wait until the task is not running,
queued, or scheduled state."
+ )
+ # Prevents the task from re-running and clearing when
prevent_running_task from the frontend and the tas is running is True.
+
ti.state = TaskInstanceState.RESTARTING
+ # If a task is cleared when running and the prevent_running_task is
false,
+ # set its state to RESTARTING so that
+ # the task is terminated and becomes eligible for retry.
else:
dr = ti.dag_run
if run_on_latest_version:
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index c78c8964599..b2502da7638 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1344,6 +1344,11 @@ export const $ClearTaskInstancesBody = {
title: 'Run On Latest Version',
description: '(Experimental) Run on the latest bundle version of
the dag after clearing the task instances.',
default: false
+ },
+ prevent_running_task: {
+ type: 'boolean',
+ title: 'Prevent Running Task',
+ default: false
}
},
additionalProperties: false,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index c2b866d87ce..808f54ab9ad 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -2557,6 +2557,7 @@ export class TaskInstanceService {
401: 'Unauthorized',
403: 'Forbidden',
404: 'Not Found',
+ 409: 'Conflict',
422: 'Validation Error'
}
});
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index e424fa33108..57c5d50b3db 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -423,6 +423,7 @@ export type ClearTaskInstancesBody = {
* (Experimental) Run on the latest bundle version of the dag after
clearing the task instances.
*/
run_on_latest_version?: boolean;
+ prevent_running_task?: boolean;
};
/**
@@ -5441,6 +5442,10 @@ export type $OpenApiTs = {
* Not Found
*/
404: HTTPExceptionResponse;
+ /**
+ * Conflict
+ */
+ 409: HTTPExceptionResponse;
/**
* Validation Error
*/
diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json
b/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json
index 6705e101962..aef5d3f65ee 100644
--- a/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json
+++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json
@@ -63,7 +63,12 @@
"past": "Past",
"queueNew": "Queue up new tasks",
"runOnLatestVersion": "Run with latest bundle version",
- "upstream": "Upstream"
+ "upstream": "Upstream",
+ "preventRunningTasks": "Prevent rerun if task is running"
+ },
+ "confirmationDialog": {
+ "title": "Cannot Clear Task Instance",
+ "description": "Task is currently in a {{state}} state started by user
{{user}} at {{time}}. \nThe user is unable to clear this task until it is done
running or a user unchecks the \"Prevent rerun of running tasks\" option in the
clear task dialog."
}
},
"search": {
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceConfirmationDialog.tsx
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceConfirmationDialog.tsx
new file mode 100644
index 00000000000..a76a2ae778f
--- /dev/null
+++
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceConfirmationDialog.tsx
@@ -0,0 +1,151 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useEffect, useState, useCallback } from "react";
+import { VStack, Icon, Text, Spinner } from "@chakra-ui/react";
+import { GoAlertFill } from "react-icons/go";
+import { useTranslation } from "react-i18next";
+import { Button, Dialog } from "src/components/ui";
+import { useClearTaskInstancesDryRun } from
"src/queries/useClearTaskInstancesDryRun";
+import { getRelativeTime } from "src/utils/datetimeUtils";
+
+type Props = {
+ readonly dagDetails?: {
+ dagId: string;
+ dagRunId: string;
+ downstream?: boolean;
+ future?: boolean;
+ mapIndex?: number;
+ onlyFailed?: boolean;
+ past?: boolean;
+ taskId: string;
+ upstream?: boolean;
+ };
+ readonly onClose: () => void;
+ readonly onConfirm?: () => void;
+ readonly open: boolean;
+ readonly preventRunningTask: boolean;
+};
+
+const ClearTaskInstanceConfirmationDialog = ({
+ dagDetails,
+ onClose,
+ onConfirm,
+ open,
+ preventRunningTask,
+}: Props) => {
+ const { t: translate } = useTranslation();
+ const { data, isFetching } = useClearTaskInstancesDryRun({
+ dagId: dagDetails?.dagId ?? "",
+ options: {
+ enabled: open && Boolean(dagDetails),
+ gcTime: 0,
+ refetchOnMount: "always",
+ refetchOnWindowFocus: false,
+ staleTime: 0,
+ },
+ requestBody: {
+ dag_run_id: dagDetails?.dagRunId ?? "",
+ include_downstream: dagDetails?.downstream,
+ include_future: dagDetails?.future,
+ include_past: dagDetails?.past,
+ include_upstream: dagDetails?.upstream,
+ only_failed: dagDetails?.onlyFailed,
+ task_ids: [[dagDetails?.taskId ?? "", dagDetails?.mapIndex ?? 0]],
+ },
+ });
+
+ const [isReady, setIsReady] = useState(false);
+
+ const handleConfirm = useCallback(() => {
+ if (onConfirm) onConfirm();
+ onClose();
+ }, [onConfirm, onClose]);
+
+ const taskInstances = data?.task_instances ?? [];
+ const [firstInstance] = taskInstances;
+ const taskCurrentState = firstInstance?.state;
+
+ useEffect(() => {
+ if (!isFetching && open && data) {
+ const isInTriggeringState =
+ taskCurrentState === "queued" || taskCurrentState === "scheduled";
+
+ if (!preventRunningTask || !isInTriggeringState) {
+ handleConfirm();
+ } else {
+ setIsReady(true);
+ }
+ }
+ }, [isFetching, data, open, handleConfirm, taskCurrentState,
preventRunningTask]);
+
+ return (
+ <Dialog.Root lazyMount onOpenChange={onClose} open={open}>
+ <Dialog.Content backdrop>
+ {isFetching ? (
+ <VStack align="center" gap={3} justify="center" py={8}>
+ <Spinner size="lg" />
+ <Text color="fg.solid" fontSize="md">
+ {translate("common:task.documentation")}
+ </Text>
+ </VStack>
+ ) : isReady ? (
+ <>
+ <Dialog.Header>
+ <VStack align="start" gap={4}>
+ <Dialog.Title>
+ <Icon color="tomato" size="lg" pr="2">
+ <GoAlertFill />
+ </Icon>
+
{translate("dags:runAndTaskActions.confirmationDialog.title")}
+ </Dialog.Title>
+ <Dialog.Description>
+ {taskInstances.length > 0 && (
+ <>
+ {translate(
+
"dags:runAndTaskActions.confirmationDialog.description",
+ {
+ state: taskCurrentState,
+ time:
+ firstInstance?.start_date !== null &&
firstInstance?.start_date !== undefined
+ ? getRelativeTime(firstInstance.start_date)
+ : undefined,
+ user:
+ (firstInstance?.unixname?.trim().length ?? 0) > 0
+ ? firstInstance?.unixname
+ : "unknown user",
+ }
+ )}
+ </>
+ )}
+ </Dialog.Description>
+ </VStack>
+ </Dialog.Header>
+ <Dialog.Footer>
+ <Button colorPalette="blue" onClick={onClose}>
+ {translate("common:modal.confirm")}
+ </Button>
+ </Dialog.Footer>
+ </>
+ ) : null}
+ </Dialog.Content>
+ </Dialog.Root>
+ );
+};
+
+export default ClearTaskInstanceConfirmationDialog;
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
index 31ac40278df..fa4e85a3bf0 100644
---
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
+++
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { Flex, Heading, VStack } from "@chakra-ui/react";
+import { Flex, Heading, VStack, useDisclosure } from "@chakra-ui/react";
import { useState } from "react";
import { useTranslation } from "react-i18next";
import { CgRedo } from "react-icons/cg";
@@ -30,6 +30,7 @@ import SegmentedControl from
"src/components/ui/SegmentedControl";
import { useClearTaskInstances } from "src/queries/useClearTaskInstances";
import { useClearTaskInstancesDryRun } from
"src/queries/useClearTaskInstancesDryRun";
import { usePatchTaskInstance } from "src/queries/usePatchTaskInstance";
+import ClearTaskInstanceConfirmationDialog from
"./ClearTaskInstanceConfirmationDialog";
type Props = {
readonly onClose: () => void;
@@ -37,10 +38,12 @@ type Props = {
readonly taskInstance: TaskInstanceResponse;
};
-const ClearTaskInstanceDialog = ({ onClose, open, taskInstance }: Props) => {
+const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog,
taskInstance }: Props) => {
const taskId = taskInstance.task_id;
const mapIndex = taskInstance.map_index;
const { t: translate } = useTranslation();
+ const { onClose, onOpen, open } = useDisclosure();
+
const dagId = taskInstance.dag_id;
const dagRunId = taskInstance.dag_run_id;
@@ -48,7 +51,7 @@ const ClearTaskInstanceDialog = ({ onClose, open,
taskInstance }: Props) => {
const { isPending, mutate } = useClearTaskInstances({
dagId,
dagRunId,
- onSuccessConfirm: onClose,
+ onSuccessConfirm: onCloseDialog,
});
const [selectedOptions, setSelectedOptions] = useState<Array<string>>([]);
@@ -59,6 +62,7 @@ const ClearTaskInstanceDialog = ({ onClose, open,
taskInstance }: Props) => {
const upstream = selectedOptions.includes("upstream");
const downstream = selectedOptions.includes("downstream");
const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
+ const [preventRunningTask, setPreventRunningTask] = useState(true);
const [note, setNote] = useState<string | null>(taskInstance.note);
const { isPending: isPendingPatchDagRun, mutate: mutatePatchTaskInstance } =
usePatchTaskInstance({
@@ -76,7 +80,7 @@ const ClearTaskInstanceDialog = ({ onClose, open,
taskInstance }: Props) => {
const { data } = useClearTaskInstancesDryRun({
dagId,
options: {
- enabled: open,
+ enabled: openDialog,
refetchOnMount: "always",
},
requestBody: {
@@ -106,7 +110,8 @@ const ClearTaskInstanceDialog = ({ onClose, open,
taskInstance }: Props) => {
taskInstanceDagVersionBundleVersion !== "";
return (
- <Dialog.Root lazyMount onOpenChange={onClose} open={open} size="xl">
+ <>
+ <Dialog.Root lazyMount onOpenChange={onCloseDialog} open={openDialog ?
!open : false} size="xl">
<Dialog.Content backdrop>
<Dialog.Header>
<VStack align="start" gap={4}>
@@ -170,35 +175,18 @@ const ClearTaskInstanceDialog = ({ onClose, open,
taskInstance }: Props) => {
{translate("dags:runAndTaskActions.options.runOnLatestVersion")}
</Checkbox>
) : undefined}
+ <Checkbox
+ checked={preventRunningTask}
+ onCheckedChange={(event) =>
setPreventRunningTask(Boolean(event.checked))}
+ style={{ marginRight: "auto"}}
+ >
+ {translate("dags:runAndTaskActions.options.preventRunningTasks")}
+ </Checkbox>
<Button
colorPalette="brand"
disabled={affectedTasks.total_entries === 0}
loading={isPending || isPendingPatchDagRun}
- onClick={() => {
- mutate({
- dagId,
- requestBody: {
- dag_run_id: dagRunId,
- dry_run: false,
- include_downstream: downstream,
- include_future: future,
- include_past: past,
- include_upstream: upstream,
- only_failed: onlyFailed,
- run_on_latest_version: runOnLatestVersion,
- task_ids: [[taskId, mapIndex]],
- },
- });
- if (note !== taskInstance.note) {
- mutatePatchTaskInstance({
- dagId,
- dagRunId,
- mapIndex,
- requestBody: { note },
- taskId,
- });
- }
- }}
+ onClick={onOpen}
>
<CgRedo /> {translate("modal.confirm")}
</Button>
@@ -206,6 +194,50 @@ const ClearTaskInstanceDialog = ({ onClose, open,
taskInstance }: Props) => {
</Dialog.Body>
</Dialog.Content>
</Dialog.Root>
+ {open ? <ClearTaskInstanceConfirmationDialog
+ dagDetails={{
+ dagId,
+ dagRunId,
+ downstream,
+ future,
+ mapIndex,
+ onlyFailed,
+ past,
+ taskId,
+ upstream,
+ }}
+ onClose={onClose}
+ onConfirm={() => {
+ mutate({
+ dagId,
+ requestBody: {
+ dag_run_id: dagRunId,
+ dry_run: false,
+ include_downstream: downstream,
+ include_future: future,
+ include_past: past,
+ include_upstream: upstream,
+ only_failed: onlyFailed,
+ run_on_latest_version: runOnLatestVersion,
+ task_ids: [[taskId, mapIndex]],
+ ...(preventRunningTask ? { prevent_running_task: true } : {}),
+ },
+ });
+ if (note !== taskInstance.note) {
+ mutatePatchTaskInstance({
+ dagId,
+ dagRunId,
+ mapIndex,
+ requestBody: { note },
+ taskId,
+ });
+ }
+ onCloseDialog();
+ }}
+ open={open}
+ preventRunningTask={preventRunningTask}
+ /> : null}
+ </>
);
};
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
index b20dae1075f..a04db2fb31d 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
@@ -19,6 +19,7 @@
import { useQueryClient } from "@tanstack/react-query";
import { useTranslation } from "react-i18next";
+
import {
UseDagRunServiceGetDagRunKeyFn,
useDagRunServiceGetDagRunsKey,
@@ -33,6 +34,7 @@ import { toaster } from "src/components/ui";
import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun";
+import type { ApiError } from "openapi/requests";
export const useClearTaskInstances = ({
dagId,
@@ -46,14 +48,40 @@ export const useClearTaskInstances = ({
const queryClient = useQueryClient();
const { t: translate } = useTranslation("dags");
- const onError = (error: Error) => {
+ const onError = (error: unknown) => {
+ let detail: string;
+ let description: string;
+
+ // Narrow the type safely
+ if (typeof error === "object" && error !== null) {
+ const apiError = error as ApiError;
+
+ description = typeof apiError.message === "string" ? apiError.message :
"";
+ const apiErrorWithDetail = apiError as unknown as { detail?: unknown };
+ detail =
+ typeof apiErrorWithDetail.body.detail === "string"
+ ? apiErrorWithDetail.body.detail
+ : "";
+
+ if ( detail.includes("AirflowClearRunningTaskException") === true ) {
+ description = detail
+ }
+
+ } else {
+ // Fallback for completely unknown errors
+ description = translate("common:error.defaultMessage")
+ }
+
toaster.create({
- description: error.message,
- title: translate("dags:runAndTaskActions.clear.error", { type:
translate("taskInstance_one") }),
- type: "error",
- });
+ description: description,
+ title: translate("dags:runAndTaskActions.clear.error", {
+ type: translate("common:taskInstance_one"),
+ }),
+ type: "error",
+ });
};
+
const onSuccess = async (
_: TaskInstanceCollectionResponse,
variables: { dagId: string; requestBody: ClearTaskInstancesBody },
@@ -103,5 +131,8 @@ export const useClearTaskInstances = ({
return useTaskInstanceServicePostClearTaskInstances({
onError,
onSuccess,
+ // This function uses the mutation function of React
+ // For showing the error toast immediately, set retry to 0
+ retry: 0
});
};
diff --git a/airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts
b/airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts
index 2ec8c47fd09..c8faa00b0c9 100644
--- a/airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts
+++ b/airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { describe, it, expect } from "vitest";
+import { describe, it, expect, vi, beforeAll, afterAll } from "vitest";
-import { getDuration, renderDuration } from "./datetimeUtils";
+import { getDuration, renderDuration, getRelativeTime } from "./datetimeUtils";
describe("getDuration", () => {
it("handles durations less than 60 seconds", () => {
@@ -56,3 +56,32 @@ describe("getDuration", () => {
expect(renderDuration(0.000_01)).toBe(undefined);
});
});
+
+describe("getRelativeTime", () => {
+ const fixedNow = new Date("2024-03-14T10:00:10.000Z");
+
+ beforeAll(() => {
+ vi.useFakeTimers();
+ vi.setSystemTime(fixedNow);
+ });
+
+ afterAll(() => {
+ vi.useRealTimers();
+ });
+
+ it("returns relative time for a valid date", () => {
+ const date = "2024-03-14T10:00:00.000Z";
+
+ expect(getRelativeTime(date)).toBe("a few seconds ago");
+ });
+
+ it("returns an empty string for undefined dates", () => {
+ expect(getRelativeTime(undefined)).toBe("");
+ });
+
+ it("handles future dates", () => {
+ const futureDate = "2024-03-14T10:00:20.000Z";
+
+ expect(getRelativeTime(futureDate)).toBe("in a few seconds");
+ });
+});
diff --git a/airflow-core/src/airflow/ui/src/utils/datetimeUtils.ts
b/airflow-core/src/airflow/ui/src/utils/datetimeUtils.ts
index abd598e3deb..dc4eea6731f 100644
--- a/airflow-core/src/airflow/ui/src/utils/datetimeUtils.ts
+++ b/airflow-core/src/airflow/ui/src/utils/datetimeUtils.ts
@@ -19,8 +19,10 @@
import dayjs from "dayjs";
import dayjsDuration from "dayjs/plugin/duration";
import tz from "dayjs/plugin/timezone";
+import relativeTime from "dayjs/plugin/relativeTime";
dayjs.extend(dayjsDuration);
+dayjs.extend(relativeTime);
dayjs.extend(tz);
export const DEFAULT_DATETIME_FORMAT = "YYYY-MM-DD HH:mm:ss";
@@ -59,3 +61,11 @@ export const formatDate = (
return dayjs(date).tz(timezone).format(format);
};
+
+export const getRelativeTime = (date: string | null | undefined): string => {
+ if (date === null || date === "" || date === undefined) {
+ return "";
+ }
+
+ return dayjs(date).fromNow();
+};
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 36605a7f4ab..5c6d8db50a4 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -2903,7 +2903,9 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
# dag (3rd argument) is a different session object. Manually asserting
that the dag_id
# is the same.
- mock_clearti.assert_called_once_with([], mock.ANY, DagRunState.QUEUED,
run_on_latest_version=False)
+ mock_clearti.assert_called_once_with(
+ [], mock.ANY, DagRunState.QUEUED, prevent_running_task=False,
run_on_latest_version=False
+ )
def test_clear_taskinstance_is_called_with_invalid_task_ids(self,
test_client, session):
"""Test that dagrun is running when invalid task_ids are passed to
clearTaskInstances API."""
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index fae4e48be63..5621a9950cf 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -172,6 +172,7 @@ class ClearTaskInstancesBody(BaseModel):
title="Run On Latest Version",
),
] = False
+ prevent_running_task: Annotated[bool | None, Field(title="Prevent Running
Task")] = False
class Value(RootModel[list]):