This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new de0e279817e Integrate API with Trigger Dag Run (#44850)
de0e279817e is described below
commit de0e279817e1bb60b96e070f02f1bd7d89adbf8f
Author: Shubham Raj <[email protected]>
AuthorDate: Tue Dec 17 02:01:54 2024 +0530
Integrate API with Trigger Dag Run (#44850)
* conf
* conf check
* basic trigger api
* retrieve params
* refactor the api
* fix
* fix details page
* refactor the logic
* refactor
* reviews
* error on form
* data date validation
* remove empty runid and note
---
airflow/ui/src/components/DataTable/DataTable.tsx | 3 +-
.../src/components/TriggerDag/TriggerDAGForm.tsx | 98 +++++++++++++-------
.../src/components/TriggerDag/TriggerDAGModal.tsx | 97 +++++++-------------
.../ui/src/components/TriggerDag/TriggerDag.tsx | 44 ---------
airflow/ui/src/queries/useDagParams.ts | 52 +++++++++++
airflow/ui/src/queries/useTrigger.ts | 102 +++++++++++++++++++++
6 files changed, 254 insertions(+), 142 deletions(-)
diff --git a/airflow/ui/src/components/DataTable/DataTable.tsx
b/airflow/ui/src/components/DataTable/DataTable.tsx
index d59d86bb594..4bcb7c7d066 100644
--- a/airflow/ui/src/components/DataTable/DataTable.tsx
+++ b/airflow/ui/src/components/DataTable/DataTable.tsx
@@ -30,7 +30,7 @@ import {
} from "@tanstack/react-table";
import React, { type ReactNode, useCallback, useRef } from "react";
-import { ProgressBar, Pagination } from "../ui";
+import { ProgressBar, Pagination, Toaster } from "../ui";
import { CardList } from "./CardList";
import { TableList } from "./TableList";
import { createSkeletonMock } from "./skeleton";
@@ -128,6 +128,7 @@ export const DataTable = <TData,>({
Boolean(isFetching) && !Boolean(isLoading) ? "visible" : "hidden"
}
/>
+ <Toaster />
{errorMessage}
{!Boolean(isLoading) && !rows.length && (
<Text pt={1}>{noRowsMessage ?? `No ${modelName}s found.`}</Text>
diff --git a/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
b/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
index 9df1f208ed8..a11cad61aef 100644
--- a/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
+++ b/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
@@ -20,28 +20,50 @@ import { Input, Button, Box, Text, Spacer, HStack } from
"@chakra-ui/react";
import { json } from "@codemirror/lang-json";
import { githubLight, githubDark } from "@uiw/codemirror-themes-all";
import CodeMirror from "@uiw/react-codemirror";
-import { useEffect, useState } from "react";
+import { useEffect, useMemo, useState } from "react";
import { useForm, Controller } from "react-hook-form";
import { FiPlay } from "react-icons/fi";
import { useColorMode } from "src/context/colorMode";
+import { useDagParams } from "src/queries/useDagParams";
+import { useTrigger } from "src/queries/useTrigger";
+import { ErrorAlert } from "../ErrorAlert";
import { Accordion } from "../ui";
-import type { DagParams } from "./TriggerDag";
type TriggerDAGFormProps = {
- dagParams: DagParams;
+ dagId: string;
onClose: () => void;
- onTrigger: (updatedDagParams: DagParams) => void;
- setDagParams: React.Dispatch<React.SetStateAction<DagParams>>;
+ open: boolean;
+};
+
+export type DagRunTriggerParams = {
+ conf: string;
+ dagRunId: string;
+ dataIntervalEnd: string;
+ dataIntervalStart: string;
+ note: string;
};
const TriggerDAGForm: React.FC<TriggerDAGFormProps> = ({
- dagParams,
- onTrigger,
- setDagParams,
+ dagId,
+ onClose,
+ open,
}) => {
- const [jsonError, setJsonError] = useState<string | undefined>();
+ const [errors, setErrors] = useState<{ conf?: string; date?: string }>({});
+ const conf = useDagParams(dagId, open);
+ const { error: errorTrigger, isPending, triggerDagRun } =
useTrigger(onClose);
+
+ const dagRunRequestBody: DagRunTriggerParams = useMemo(
+ () => ({
+ conf,
+ dagRunId: "",
+ dataIntervalEnd: "",
+ dataIntervalStart: "",
+ note: "",
+ }),
+ [conf],
+ );
const {
control,
@@ -50,40 +72,47 @@ const TriggerDAGForm: React.FC<TriggerDAGFormProps> = ({
reset,
setValue,
watch,
- } = useForm<DagParams>({
- defaultValues: dagParams,
- });
+ } = useForm<DagRunTriggerParams>({ defaultValues: dagRunRequestBody });
const dataIntervalStart = watch("dataIntervalStart");
const dataIntervalEnd = watch("dataIntervalEnd");
useEffect(() => {
- reset(dagParams);
- }, [dagParams, reset]);
-
- const onSubmit = (data: DagParams) => {
- onTrigger(data);
- setDagParams(data);
- setJsonError(undefined);
- };
+ reset(dagRunRequestBody);
+ }, [dagRunRequestBody, reset]);
const validateAndPrettifyJson = (value: string) => {
try {
const parsedJson = JSON.parse(value) as JSON;
- setJsonError(undefined);
+ setErrors((prev) => ({ ...prev, conf: undefined }));
return JSON.stringify(parsedJson, undefined, 2);
} catch (error) {
const errorMessage =
error instanceof Error ? error.message : "Unknown error occurred.";
- setJsonError(`Invalid JSON format: ${errorMessage}`);
+ setErrors((prev) => ({
+ ...prev,
+ conf: `Invalid JSON format: ${errorMessage}`,
+ }));
return value;
}
};
+ const onSubmit = (data: DagRunTriggerParams) => {
+ if (Boolean(data.dataIntervalStart) !== Boolean(data.dataIntervalEnd)) {
+ setErrors((prev) => ({
+ ...prev,
+ date: "Either both Data Interval Start and End must be provided, or
both must be empty.",
+ }));
+
+ return;
+ }
+ triggerDagRun(dagId, data);
+ };
+
const validateDates = (
fieldName: "dataIntervalEnd" | "dataIntervalStart",
) => {
@@ -92,6 +121,8 @@ const TriggerDAGForm: React.FC<TriggerDAGFormProps> = ({
: undefined;
const endDate = dataIntervalEnd ? new Date(dataIntervalEnd) : undefined;
+ setErrors((prev) => ({ ...prev, date: undefined }));
+
if (startDate && endDate) {
if (fieldName === "dataIntervalStart" && startDate > endDate) {
setValue("dataIntervalStart", dataIntervalEnd);
@@ -105,7 +136,8 @@ const TriggerDAGForm: React.FC<TriggerDAGFormProps> = ({
return (
<>
- <Accordion.Root collapsible size="lg" variant="enclosed">
+ <ErrorAlert error={errorTrigger} />
+ <Accordion.Root collapsible mt={4} size="lg" variant="enclosed">
<Accordion.Item key="advancedOptions" value="advancedOptions">
<Accordion.ItemTrigger cursor="button">
Advanced Options
@@ -153,7 +185,7 @@ const TriggerDAGForm: React.FC<TriggerDAGFormProps> = ({
</Text>
<Controller
control={control}
- name="runId"
+ name="dagRunId"
render={({ field }) => (
<Input
{...field}
@@ -168,7 +200,7 @@ const TriggerDAGForm: React.FC<TriggerDAGFormProps> = ({
</Text>
<Controller
control={control}
- name="configJson"
+ name="conf"
render={({ field }) => (
<Box mb={4}>
<CodeMirror
@@ -196,11 +228,11 @@ const TriggerDAGForm: React.FC<TriggerDAGFormProps> = ({
}}
theme={colorMode === "dark" ? githubDark : githubLight}
/>
- {Boolean(jsonError) ? (
+ {Boolean(errors.conf) && (
<Text color="red.500" fontSize="sm" mt={2}>
- {jsonError}
+ {errors.conf}
</Text>
- ) : undefined}
+ )}
</Box>
)}
/>
@@ -210,7 +242,7 @@ const TriggerDAGForm: React.FC<TriggerDAGFormProps> = ({
</Text>
<Controller
control={control}
- name="notes"
+ name="note"
render={({ field }) => (
<Input {...field} placeholder="Optional" size="sm" />
)}
@@ -219,7 +251,11 @@ const TriggerDAGForm: React.FC<TriggerDAGFormProps> = ({
</Accordion.ItemContent>
</Accordion.Item>
</Accordion.Root>
-
+ {Boolean(errors.date) && (
+ <Text color="red.500" fontSize="sm" mt={2}>
+ {errors.date}
+ </Text>
+ )}
<Box as="footer" display="flex" justifyContent="flex-end" mt={4}>
<HStack w="full">
{isDirty ? (
@@ -230,7 +266,7 @@ const TriggerDAGForm: React.FC<TriggerDAGFormProps> = ({
<Spacer />
<Button
colorPalette="blue"
- disabled={Boolean(jsonError)}
+ disabled={Boolean(errors.conf) || Boolean(errors.date) ||
isPending}
onClick={() => void handleSubmit(onSubmit)()}
>
<FiPlay /> Trigger
diff --git a/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx
b/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx
index e4b946fd41f..b72c7482af7 100644
--- a/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx
+++ b/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx
@@ -17,14 +17,12 @@
* under the License.
*/
import { Heading, VStack } from "@chakra-ui/react";
-import React, { useCallback, useEffect, useMemo, useState } from "react";
+import React from "react";
import { Alert, Dialog } from "src/components/ui";
import { TogglePause } from "../TogglePause";
import TriggerDAGForm from "./TriggerDAGForm";
-import type { DagParams } from "./TriggerDag";
-import { TriggerDag as triggerDag } from "./TriggerDag";
type TriggerDAGModalProps = {
dagDisplayName: string;
@@ -40,70 +38,37 @@ const TriggerDAGModal: React.FC<TriggerDAGModalProps> = ({
isPaused,
onClose,
open,
-}) => {
- const initialDagParams = useMemo(
- () => ({
- configJson: "{}",
- dagId,
- dataIntervalEnd: "",
- dataIntervalStart: "",
- notes: "",
- runId: "",
- }),
- [dagId],
- );
+}) => (
+ <Dialog.Root
+ lazyMount
+ onOpenChange={onClose}
+ open={open}
+ size="xl"
+ unmountOnExit
+ >
+ <Dialog.Content backdrop>
+ <Dialog.Header>
+ <VStack align="start" gap={4}>
+ <Heading size="xl">
+ Trigger DAG - {dagDisplayName}{" "}
+ <TogglePause dagId={dagId} isPaused={isPaused} skipConfirm />
+ </Heading>
+ {isPaused ? (
+ <Alert status="warning" title="Paused DAG">
+ Triggering will create a DAG run, but it will not start until the
+ DAG is unpaused.
+ </Alert>
+ ) : undefined}
+ </VStack>
+ </Dialog.Header>
- const [dagParams, setDagParams] = useState<DagParams>(initialDagParams);
+ <Dialog.CloseTrigger />
- const handleTrigger = useCallback(
- (updatedDagParams: DagParams) => {
- triggerDag(updatedDagParams);
- onClose();
- },
- [onClose],
- );
-
- useEffect(() => {
- if (!open) {
- setDagParams(initialDagParams);
- }
- }, [open, initialDagParams]);
-
- return (
- <Dialog.Root onOpenChange={onClose} open={open} size="xl">
- <Dialog.Content backdrop>
- <Dialog.Header>
- <VStack align="start" gap={4}>
- <Heading size="xl">
- Trigger DAG - {dagDisplayName}{" "}
- <TogglePause
- dagId={dagParams.dagId}
- isPaused={isPaused}
- skipConfirm
- />
- </Heading>
- {isPaused ? (
- <Alert status="warning" title="Paused DAG">
- Triggering will create a DAG run, but it will not start until
- the DAG is unpaused.
- </Alert>
- ) : undefined}
- </VStack>
- </Dialog.Header>
-
- <Dialog.CloseTrigger />
-
- <Dialog.Body>
- <TriggerDAGForm
- dagParams={dagParams}
- onClose={onClose}
- onTrigger={handleTrigger}
- setDagParams={setDagParams}
- />
- </Dialog.Body>
- </Dialog.Content>
- </Dialog.Root>
- );
-};
+ <Dialog.Body>
+ <TriggerDAGForm dagId={dagId} onClose={onClose} open={open} />
+ </Dialog.Body>
+ </Dialog.Content>
+ </Dialog.Root>
+);
export default TriggerDAGModal;
diff --git a/airflow/ui/src/components/TriggerDag/TriggerDag.tsx
b/airflow/ui/src/components/TriggerDag/TriggerDag.tsx
deleted file mode 100644
index 48f5755ba0d..00000000000
--- a/airflow/ui/src/components/TriggerDag/TriggerDag.tsx
+++ /dev/null
@@ -1,44 +0,0 @@
-/*!
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-export type DagParams = {
- configJson: string;
- dagId: string;
- dataIntervalEnd: string;
- dataIntervalStart: string;
- notes: string;
- runId: string;
-};
-
-export const TriggerDag = (dagParams: DagParams) => {
- // eslint-disable-next-line no-alert
- alert(`
- Triggering DAG with the following parameters:
-
- Config JSON: ${JSON.stringify(dagParams.configJson)}
- Data Interval Start Date: ${dagParams.dataIntervalStart}
- Data Interval End Date: ${dagParams.dataIntervalEnd}
- Run ID: ${dagParams.runId}
-
- TODO: This trigger button is under progress.
- The values you have entered are shown above.
- `);
-
- // TODO triggering logic (would be placed here once the FAST API is
available)
-};
diff --git a/airflow/ui/src/queries/useDagParams.ts
b/airflow/ui/src/queries/useDagParams.ts
new file mode 100644
index 00000000000..bbb1cc282b8
--- /dev/null
+++ b/airflow/ui/src/queries/useDagParams.ts
@@ -0,0 +1,52 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useDagServiceGetDagDetails } from "openapi/queries";
+import { toaster } from "src/components/ui";
+
+export const useDagParams = (dagId: string, open: boolean) => {
+ const { data, error } = useDagServiceGetDagDetails({ dagId }, undefined, {
+ enabled: open,
+ });
+
+ if (Boolean(error)) {
+ const errorDescription =
+ typeof error === "object" && error !== null
+ ? JSON.stringify(error, undefined, 2) // Safely stringify the object
with pretty-printing
+ : String(error ?? ""); // Convert other types (e.g., numbers, strings)
to string
+
+ toaster.create({
+ description: `Dag params request failed. Error: ${errorDescription}`,
+ title: "Getting Dag Params Failed",
+ type: "error",
+ });
+ }
+
+ const transformedParams = data?.params
+ ? Object.fromEntries(
+ Object.entries(data.params).map(([key, param]) => [
+ key,
+ (param as { value: unknown }).value,
+ ]),
+ )
+ : {};
+
+ const initialConf = JSON.stringify(transformedParams, undefined, 2);
+
+ return initialConf;
+};
diff --git a/airflow/ui/src/queries/useTrigger.ts
b/airflow/ui/src/queries/useTrigger.ts
new file mode 100644
index 00000000000..8ee004e7d3f
--- /dev/null
+++ b/airflow/ui/src/queries/useTrigger.ts
@@ -0,0 +1,102 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueryClient } from "@tanstack/react-query";
+import { useState } from "react";
+
+import {
+ useDagRunServiceGetDagRunsKey,
+ useDagRunServiceTriggerDagRun,
+ useDagServiceGetDagsKey,
+ useDagsServiceRecentDagRunsKey,
+} from "openapi/queries";
+import type { DagRunTriggerParams } from
"src/components/TriggerDag/TriggerDAGForm";
+import { toaster } from "src/components/ui";
+
+export const useTrigger = (onClose: () => void) => {
+ const queryClient = useQueryClient();
+ const [error, setError] = useState<unknown>(undefined);
+
+ const onSuccess = async () => {
+ const queryKeys = [
+ useDagServiceGetDagsKey,
+ useDagsServiceRecentDagRunsKey,
+ useDagRunServiceGetDagRunsKey,
+ ];
+
+ await Promise.all(
+ queryKeys.map((key) =>
+ queryClient.invalidateQueries({ queryKey: [key] }),
+ ),
+ );
+
+ toaster.create({
+ description: "DAG run has been successfully triggered.",
+ title: "DAG Run Request Submitted",
+ type: "success",
+ });
+
+ onClose();
+ };
+
+ const onError = (_error: unknown) => {
+ setError(_error);
+ };
+
+ const { isPending, mutate } = useDagRunServiceTriggerDagRun({
+ onError,
+ onSuccess,
+ });
+
+ const triggerDagRun = (
+ dagId: string,
+ dagRunRequestBody: DagRunTriggerParams,
+ ) => {
+ const parsedConfig = JSON.parse(dagRunRequestBody.conf) as Record<
+ string,
+ unknown
+ >;
+
+ const formattedDataIntervalStart = dagRunRequestBody.dataIntervalStart
+ ? new Date(dagRunRequestBody.dataIntervalStart).toISOString()
+ : undefined;
+ const formattedDataIntervalEnd = dagRunRequestBody.dataIntervalEnd
+ ? new Date(dagRunRequestBody.dataIntervalEnd).toISOString()
+ : undefined;
+
+ const checkDagRunId =
+ dagRunRequestBody.dagRunId === ""
+ ? undefined
+ : dagRunRequestBody.dagRunId;
+ const checkNote =
+ dagRunRequestBody.note === "" ? undefined : dagRunRequestBody.note;
+
+ mutate({
+ dagId,
+ requestBody: {
+ conf: parsedConfig,
+ dag_run_id: checkDagRunId,
+ data_interval_end: formattedDataIntervalEnd,
+ data_interval_start: formattedDataIntervalStart,
+ note: checkNote,
+ },
+ });
+ };
+
+ return { error, isPending, triggerDagRun };
+};