This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b31231445c4 fix: use Dag form when materializing asset (#64211)
b31231445c4 is described below
commit b31231445c479ba93fb451bbdd85fbc90a1795d4
Author: Wei Lee <[email protected]>
AuthorDate: Mon Mar 30 22:17:03 2026 +0800
fix: use Dag form when materializing asset (#64211)
* fix: use Dag form when materializing asset
* fixup! fix: use Dag form when materializing asset
* fixup! fixup! fix: use Dag form when materializing asset
* fixup! fixup! fixup! fix: use Dag form when materializing asset
* fixup! fixup! fixup! fixup! fix: use Dag form when materializing asset
* test: add test case
test_should_respond_200_with_trigger_fields_without_dag_run_id
* fix: restore useTrigger in TriggerDAGModal after TriggerDAGForm refactor
TriggerDAGForm no longer owns the trigger mutation after the refactor
(onSubmitTrigger was made a prop). TriggerDAGModal must now supply
useTrigger and pass error/isPending/onSubmitTrigger down to the form,
otherwise clicking Trigger does nothing and dagRunId stays null.
---
.../api_fastapi/core_api/datamodels/assets.py | 27 ++++-
.../core_api/openapi/v2-rest-api-generated.yaml | 59 ++++++++++
.../api_fastapi/core_api/routes/public/assets.py | 16 +--
.../src/airflow/ui/openapi-gen/queries/queries.ts | 7 +-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 102 ++++++++++++++++++
.../ui/openapi-gen/requests/services.gen.ts | 3 +
.../airflow/ui/openapi-gen/requests/types.gen.ts | 17 +++
.../src/components/TriggerDag/TriggerDAGForm.tsx | 37 +++----
.../src/components/TriggerDag/TriggerDAGModal.tsx | 6 +-
.../ui/src/pages/Asset/CreateAssetEventModal.tsx | 119 ++++++++++++---------
.../core_api/routes/public/test_assets.py | 53 +++++++++
.../src/airflowctl/api/datamodels/generated.py | 18 ++++
12 files changed, 382 insertions(+), 82 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
index c6cb2fa2827..041ec12c1f2 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -19,11 +19,18 @@ from __future__ import annotations
from collections.abc import Iterable
from datetime import datetime
+from typing import TYPE_CHECKING
-from pydantic import AliasPath, ConfigDict, Field, JsonValue, NonNegativeInt,
field_validator
+from pydantic import AliasPath, AwareDatetime, ConfigDict, Field, JsonValue,
NonNegativeInt, field_validator
from airflow._shared.secrets_masker import redact
+from airflow._shared.timezones import timezone
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
+from airflow.api_fastapi.core_api.datamodels.dag_run import
TriggerDAGRunPostBody
+from airflow.utils.types import DagRunType
+
+if TYPE_CHECKING:
+ from airflow.serialization.definitions.dag import SerializedDAG
class DagScheduleAssetReference(StrictBaseModel):
@@ -185,3 +192,21 @@ class CreateAssetEventsBody(StrictBaseModel):
return v
model_config = ConfigDict(extra="forbid")
+
+
+class MaterializeAssetBody(TriggerDAGRunPostBody):
+ """Materialize asset request."""
+
+ logical_date: AwareDatetime | None = None
+
+ def validate_context(self, dag: SerializedDAG) -> dict:
+ params = super().validate_context(dag)
+ if self.dag_run_id is None:
+ params["run_id"] = dag.timetable.generate_run_id(
+ run_type=DagRunType.ASSET_MATERIALIZATION,
+ run_after=timezone.coerce_datetime(params["run_after"]),
+ data_interval=params["data_interval"],
+ )
+ return params
+
+ model_config = ConfigDict(extra="forbid")
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 128d0f2bd05..b167e463a35 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -484,6 +484,14 @@ paths:
schema:
type: integer
title: Asset Id
+ requestBody:
+ content:
+ application/json:
+ schema:
+ anyOf:
+ - $ref: '#/components/schemas/MaterializeAssetBody'
+ - type: 'null'
+ title: Body
responses:
'200':
description: Successful Response
@@ -11949,6 +11957,57 @@ components:
type: object
title: LastAssetEventResponse
description: Last asset event response serializer.
+ MaterializeAssetBody:
+ properties:
+ dag_run_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Dag Run Id
+ data_interval_start:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Data Interval Start
+ data_interval_end:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Data Interval End
+ logical_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Logical Date
+ run_after:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Run After
+ conf:
+ anyOf:
+ - additionalProperties: true
+ type: object
+ - type: 'null'
+ title: Conf
+ note:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Note
+ partition_key:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Partition Key
+ additionalProperties: false
+ type: object
+ title: MaterializeAssetBody
+ description: Materialize asset request.
PatchTaskInstanceBody:
properties:
new_state:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
index 68220c20e8f..a911984dc65 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -54,6 +54,7 @@ from airflow.api_fastapi.core_api.datamodels.assets import (
AssetEventResponse,
AssetResponse,
CreateAssetEventsBody,
+ MaterializeAssetBody,
QueuedEventCollectionResponse,
QueuedEventResponse,
)
@@ -387,6 +388,7 @@ def materialize_asset(
dag_bag: DagBagDep,
user: GetUserDep,
session: SessionDep,
+ body: MaterializeAssetBody | None = None,
) -> DAGRunResponse:
"""Materialize an asset by triggering a DAG run that produces it."""
dag_id_it = iter(
@@ -425,17 +427,19 @@ def materialize_asset(
f"Dag with dag_id: '{dag_id}' does not allow asset materialization
runs",
)
+ params = (body or MaterializeAssetBody()).validate_context(dag)
return dag.create_dagrun(
- run_id=dag.timetable.generate_run_id(
- run_type=DagRunType.ASSET_MATERIALIZATION,
- run_after=(run_after :=
timezone.coerce_datetime(timezone.utcnow())),
- data_interval=None,
- ),
- run_after=run_after,
+ run_id=params["run_id"],
+ logical_date=params["logical_date"],
+ data_interval=params["data_interval"],
+ run_after=params["run_after"],
+ conf=params["conf"],
run_type=DagRunType.ASSET_MATERIALIZATION,
triggered_by=DagRunTriggeredByType.REST_API,
triggering_user_name=user.get_name(),
state=DagRunState.QUEUED,
+ partition_key=params["partition_key"],
+ note=params["note"],
session=session,
)
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 8e9ef5aa29d..dac7a198e59 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -2,7 +2,7 @@
import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from
"@tanstack/react-query";
import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService,
DagSourceService, DagStatsService, DagVersionService, DagWarningService,
DashboardService, DeadlinesService, DependenciesService, EventLogService,
ExperimentalService, ExtraLinksService, GanttService, GridService,
ImportErrorService, JobService, LoginService, MonitorService,
PartitionedDagRunService, PluginService, PoolService, Provide [...]
-import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_,
BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_,
ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody,
DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState,
DagWarningType, GenerateTokenBody, PatchTaskInstanceBody, PoolBody,
PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody,
UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from
"../requests/t [...]
+import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_,
BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_,
ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody,
DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState,
DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody,
PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody,
UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody [...]
import * as Common from "./common";
/**
* Get Assets
@@ -1765,14 +1765,17 @@ export const useAssetServiceCreateAssetEvent = <TData =
Common.AssetServiceCreat
* Materialize an asset by triggering a DAG run that produces it.
* @param data The data for the request.
* @param data.assetId
+* @param data.requestBody
* @returns DAGRunResponse Successful Response
* @throws ApiError
*/
export const useAssetServiceMaterializeAsset = <TData =
Common.AssetServiceMaterializeAssetMutationResult, TError = unknown, TContext =
unknown>(options?: Omit<UseMutationOptions<TData, TError, {
assetId: number;
+ requestBody?: MaterializeAssetBody;
}, TContext>, "mutationFn">) => useMutation<TData, TError, {
assetId: number;
-}, TContext>({ mutationFn: ({ assetId }) => AssetService.materializeAsset({
assetId }) as unknown as Promise<TData>, ...options });
+ requestBody?: MaterializeAssetBody;
+}, TContext>({ mutationFn: ({ assetId, requestBody }) =>
AssetService.materializeAsset({ assetId, requestBody }) as unknown as
Promise<TData>, ...options });
/**
* Create Backfill
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index e17c16e70e4..c5077028b44 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -4473,6 +4473,108 @@ export const $LastAssetEventResponse = {
description: 'Last asset event response serializer.'
} as const;
+export const $MaterializeAssetBody = {
+ properties: {
+ dag_run_id: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Dag Run Id'
+ },
+ data_interval_start: {
+ anyOf: [
+ {
+ type: 'string',
+ format: 'date-time'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Data Interval Start'
+ },
+ data_interval_end: {
+ anyOf: [
+ {
+ type: 'string',
+ format: 'date-time'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Data Interval End'
+ },
+ logical_date: {
+ anyOf: [
+ {
+ type: 'string',
+ format: 'date-time'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Logical Date'
+ },
+ run_after: {
+ anyOf: [
+ {
+ type: 'string',
+ format: 'date-time'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Run After'
+ },
+ conf: {
+ anyOf: [
+ {
+ additionalProperties: true,
+ type: 'object'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Conf'
+ },
+ note: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Note'
+ },
+ partition_key: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Partition Key'
+ }
+ },
+ additionalProperties: false,
+ type: 'object',
+ title: 'MaterializeAssetBody',
+ description: 'Materialize asset request.'
+} as const;
+
export const $PatchTaskInstanceBody = {
properties: {
new_state: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index 6e701bf68dc..6f0c2af82fb 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -172,6 +172,7 @@ export class AssetService {
* Materialize an asset by triggering a DAG run that produces it.
* @param data The data for the request.
* @param data.assetId
+ * @param data.requestBody
* @returns DAGRunResponse Successful Response
* @throws ApiError
*/
@@ -182,6 +183,8 @@ export class AssetService {
path: {
asset_id: data.assetId
},
+ body: data.requestBody,
+ mediaType: 'application/json',
errors: {
400: 'Bad Request',
401: 'Unauthorized',
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 1bf8cd21fb1..521643cf7f0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1134,6 +1134,22 @@ export type LastAssetEventResponse = {
timestamp?: string | null;
};
+/**
+ * Materialize asset request.
+ */
+export type MaterializeAssetBody = {
+ dag_run_id?: string | null;
+ data_interval_start?: string | null;
+ data_interval_end?: string | null;
+ logical_date?: string | null;
+ run_after?: string | null;
+ conf?: {
+ [key: string]: unknown;
+} | null;
+ note?: string | null;
+ partition_key?: string | null;
+};
+
/**
* Request body for Clear Task Instances endpoint.
*/
@@ -2305,6 +2321,7 @@ export type CreateAssetEventResponse = AssetEventResponse;
export type MaterializeAssetData = {
assetId: number;
+ requestBody?: MaterializeAssetBody | null;
};
export type MaterializeAssetResponse = DAGRunResponse;
diff --git
a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
index d94991378e7..91b2cd1daa0 100644
--- a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
+++ b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
@@ -26,7 +26,6 @@ import { FiPlay } from "react-icons/fi";
import { useDagParams } from "src/queries/useDagParams";
import { useParamStore } from "src/queries/useParamStore";
import { useTogglePause } from "src/queries/useTogglePause";
-import { useTrigger } from "src/queries/useTrigger";
import { DEFAULT_DATETIME_FORMAT } from "src/utils/datetimeUtils";
import ConfigForm from "../ConfigForm";
@@ -35,16 +34,17 @@ import { ErrorAlert, type ExpandedApiError } from
"../ErrorAlert";
import { Checkbox } from "../ui/Checkbox";
import { RadioCardItem, RadioCardRoot } from "../ui/RadioCard";
import TriggerDAGAdvancedOptions from "./TriggerDAGAdvancedOptions";
-import type { DagRunTriggerParams } from "./types";
-import { dataIntervalModeOptions } from "./types";
+import { dataIntervalModeOptions, type DagRunTriggerParams } from "./types";
type TriggerDAGFormProps = {
readonly dagDisplayName: string;
readonly dagId: string;
+ readonly error?: unknown;
readonly hasSchedule: boolean;
readonly isPartitioned: boolean;
readonly isPaused: boolean;
- readonly onClose: () => void;
+ readonly isPending?: boolean;
+ readonly onSubmitTrigger?: (params: DagRunTriggerParams) => void;
readonly open: boolean;
readonly prefillConfig?:
| {
@@ -58,10 +58,12 @@ type TriggerDAGFormProps = {
const TriggerDAGForm = ({
dagDisplayName,
dagId,
+ error,
hasSchedule,
isPartitioned,
isPaused,
- onClose,
+ isPending = false,
+ onSubmitTrigger,
open,
prefillConfig,
}: TriggerDAGFormProps) => {
@@ -69,10 +71,8 @@ const TriggerDAGForm = ({
const [errors, setErrors] = useState<{ conf?: string; date?: unknown }>({});
const [formError, setFormError] = useState(false);
const initialParamsDict = useDagParams(dagId, open);
- const { error: errorTrigger, isPending, triggerDagRun } = useTrigger({
dagId, onSuccessConfirm: onClose });
const { conf, initialParamDict, setConf, setInitialParamDict } =
useParamStore();
const [unpause, setUnpause] = useState(true);
-
const { mutate: togglePause } = useTogglePause({ dagId });
const { control, handleSubmit, reset, watch } =
useForm<DagRunTriggerParams>({
@@ -106,7 +106,6 @@ const TriggerDAGForm = ({
note: "",
partitionKey: undefined,
});
-
// Also update the param store to keep it in sync.
// Wait until we have the initial params so section ordering stays
consistent.
if (confString && Object.keys(initialParamsDict.paramsDict).length > 0) {
@@ -130,16 +129,11 @@ const TriggerDAGForm = ({
// Automatically reset form when conf is fetched (only if no prefillConfig)
useEffect(() => {
if (conf && !prefillConfig && open) {
- reset((prevValues) => ({
- ...prevValues,
- conf,
- }));
+ reset((prevValues) => ({ ...prevValues, conf }));
}
}, [conf, prefillConfig, open, reset]);
- const resetDateError = () => {
- setErrors((prev) => ({ ...prev, date: undefined }));
- };
+ const resetDateError = () => setErrors((prev) => ({ ...prev, date: undefined
}));
const dataIntervalMode = watch("dataIntervalMode");
const dataIntervalStart = watch("dataIntervalStart");
@@ -150,19 +144,14 @@ const TriggerDAGForm = ({
(noDataInterval ||
dayjs(dataIntervalStart).isAfter(dayjs(dataIntervalEnd)));
const onSubmit = (data: DagRunTriggerParams) => {
if (unpause && isPaused) {
- togglePause({
- dagId,
- requestBody: {
- is_paused: false,
- },
- });
+ togglePause({ dagId, requestBody: { is_paused: false } });
}
- triggerDagRun(data);
+ onSubmitTrigger?.(data);
};
return (
<>
- <ErrorAlert error={errors.date ?? errorTrigger} />
+ <ErrorAlert error={errors.date ?? error} />
<VStack alignItems="stretch" gap={2} pt={4}>
{isPartitioned ? undefined : (
<>
@@ -272,7 +261,7 @@ const TriggerDAGForm = ({
formError ||
isPending ||
dataIntervalInvalid ||
- (Boolean(errorTrigger) && (errorTrigger as
ExpandedApiError).status === 403)
+ (Boolean(error) && (error as ExpandedApiError).status === 403)
}
onClick={() => void handleSubmit(onSubmit)()}
>
diff --git
a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx
b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx
index 840d15626f9..07080ac0d33 100644
--- a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx
+++ b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx
@@ -23,6 +23,7 @@ import { useTranslation } from "react-i18next";
import { useDagServiceGetDag } from "openapi/queries";
import { Dialog, Tooltip } from "src/components/ui";
import { RadioCardItem, RadioCardRoot } from "src/components/ui/RadioCard";
+import { useTrigger } from "src/queries/useTrigger";
import RunBackfillForm from "../DagActions/RunBackfillForm";
import TriggerDAGForm from "./TriggerDAGForm";
@@ -73,6 +74,7 @@ const TriggerDAGModal: React.FC<TriggerDAGModalProps> = ({
const hasSchedule = dag?.timetable_summary !== null;
const isPartitioned = dag ? dag.timetable_partitioned : false;
+ const { error, isPending, triggerDagRun } = useTrigger({ dagId,
onSuccessConfirm: onClose });
const maxDisplayLength = 59; // hard-coded length to prevent dag name
overflowing the modal
const nameOverflowing = dagDisplayName.length > maxDisplayLength;
@@ -134,10 +136,12 @@ const TriggerDAGModal: React.FC<TriggerDAGModalProps> = ({
<TriggerDAGForm
dagDisplayName={dagDisplayName}
dagId={dagId}
+ error={error}
hasSchedule={hasSchedule}
isPartitioned={isPartitioned}
isPaused={isPaused}
- onClose={onClose}
+ isPending={isPending}
+ onSubmitTrigger={triggerDagRun}
open={open}
prefillConfig={prefillConfig}
/>
diff --git
a/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
b/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
index 472981325bf..6b9d00afad1 100644
--- a/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { Button, Field, Heading, HStack, VStack, Text } from
"@chakra-ui/react";
+import { Button, Field, Heading, HStack, Text, VStack } from
"@chakra-ui/react";
import { useQueryClient } from "@tanstack/react-query";
import { useState } from "react";
import { useTranslation } from "react-i18next";
@@ -36,14 +36,15 @@ import type {
AssetEventResponse,
AssetResponse,
DAGRunResponse,
+ MaterializeAssetBody,
EdgeResponse,
} from "openapi/requests/types.gen";
import { ErrorAlert } from "src/components/ErrorAlert";
import { JsonEditor } from "src/components/JsonEditor";
+import TriggerDAGForm from "src/components/TriggerDag/TriggerDAGForm";
+import type { DagRunTriggerParams } from "src/components/TriggerDag/types";
import { Dialog, toaster } from "src/components/ui";
-import { Checkbox } from "src/components/ui/Checkbox";
import { RadioCardItem, RadioCardRoot } from "src/components/ui/RadioCard";
-import { useTogglePause } from "src/queries/useTogglePause";
type Props = {
readonly asset: AssetResponse;
@@ -55,7 +56,6 @@ export const CreateAssetEventModal = ({ asset, onClose, open
}: Props) => {
const { t: translate } = useTranslation(["assets", "components"]);
const [eventType, setEventType] = useState("manual");
const [extraError, setExtraError] = useState<string | undefined>();
- const [unpause, setUnpause] = useState(true);
const [extra, setExtra] = useState("{}");
const [partitionKey, setPartitionKey] = useState<string |
undefined>(undefined);
const queryClient = useQueryClient();
@@ -93,6 +93,7 @@ export const CreateAssetEventModal = ({ asset, onClose, open
}: Props) => {
const onSuccess = async (response: AssetEventResponse | DAGRunResponse) => {
setExtra("{}");
setExtraError(undefined);
+ setPartitionKey(undefined);
onClose();
let queryKeys = [UseAssetServiceGetAssetEventsKeyFn({ assetId: asset.id },
[{ assetId: asset.id }])];
@@ -127,11 +128,9 @@ export const CreateAssetEventModal = ({ asset, onClose,
open }: Props) => {
enabled: Boolean(upstreamDagId),
});
- const { mutate: togglePause } = useTogglePause({ dagId: dag?.dag_id ??
upstreamDagId ?? "" });
-
const {
error: manualError,
- isPending,
+ isPending: isManualPending,
mutate: createAssetEvent,
} = useAssetServiceCreateAssetEvent({ onSuccess });
const {
@@ -142,28 +141,41 @@ export const CreateAssetEventModal = ({ asset, onClose,
open }: Props) => {
onSuccess,
});
- const handleSubmit = () => {
- if (eventType === "materialize") {
- if (unpause && dag?.is_paused) {
- togglePause({
- dagId: dag.dag_id,
- requestBody: {
- is_paused: false,
- },
- });
- }
- materializeAsset({ assetId: asset.id });
- } else {
- createAssetEvent({
- requestBody: {
- asset_id: asset.id,
- extra: JSON.parse(extra) as Record<string, unknown>,
- partition_key: partitionKey ?? null,
- },
- });
- }
+ const handleMaterializeSubmit = (dagRunRequestBody: DagRunTriggerParams) => {
+ const parsedConfig = JSON.parse(dagRunRequestBody.conf) as Record<string,
unknown>;
+ const logicalDate = dagRunRequestBody.logicalDate ? new
Date(dagRunRequestBody.logicalDate) : undefined;
+ const dataIntervalStart = dagRunRequestBody.dataIntervalStart
+ ? new Date(dagRunRequestBody.dataIntervalStart)
+ : undefined;
+ const dataIntervalEnd = dagRunRequestBody.dataIntervalEnd
+ ? new Date(dagRunRequestBody.dataIntervalEnd)
+ : undefined;
+
+ const requestBody: MaterializeAssetBody = {
+ conf: parsedConfig,
+ dag_run_id: dagRunRequestBody.dagRunId === "" ? undefined :
dagRunRequestBody.dagRunId,
+ data_interval_end: dataIntervalEnd?.toISOString() ?? null,
+ data_interval_start: dataIntervalStart?.toISOString() ?? null,
+ logical_date: logicalDate?.toISOString() ?? null,
+ note: dagRunRequestBody.note === "" ? undefined : dagRunRequestBody.note,
+ partition_key: dagRunRequestBody.partitionKey ?? null,
+ };
+
+ materializeAsset({
+ assetId: asset.id,
+ requestBody,
+ });
};
+ const handleManualSubmit = () =>
+ createAssetEvent({
+ requestBody: {
+ asset_id: asset.id,
+ extra: JSON.parse(extra) as Record<string, unknown>,
+ partition_key: partitionKey ?? null,
+ },
+ });
+
return (
<Dialog.Root lazyMount onOpenChange={onClose} open={open} size="xl"
unmountOnExit>
<Dialog.Content backdrop>
@@ -203,37 +215,48 @@ export const CreateAssetEventModal = ({ asset, onClose,
open }: Props) => {
/>
</HStack>
</RadioCardRoot>
+ {eventType === "manual" ? (
+ <Field.Root mt={6}>
+ <Field.Label
fontSize="md">{translate("createEvent.manual.extra")}</Field.Label>
+ <JsonEditor onChange={validateAndPrettifyJson} value={extra} />
+ <Text color="fg.error">{extraError}</Text>
+ </Field.Root>
+ ) : undefined}
{eventType === "manual" ? (
<>
- <Field.Root mt={6}>
- <Field.Label
fontSize="md">{translate("createEvent.manual.extra")}</Field.Label>
- <JsonEditor onChange={validateAndPrettifyJson} value={extra} />
- <Text color="fg.error">{extraError}</Text>
- </Field.Root>
<Field.Root mt={6}>
<Field.Label
fontSize="md">{translate("common:dagRun.partitionKey")}</Field.Label>
<JsonEditor onChange={setPartitionKey} value={partitionKey} />
- <Text color="fg.error">{extraError}</Text>
</Field.Root>
+ <ErrorAlert error={manualError} />
</>
) : undefined}
- {eventType === "materialize" && dag?.is_paused ? (
- <Checkbox checked={unpause} colorPalette="brand" onChange={() =>
setUnpause(!unpause)}>
- {translate("createEvent.materialize.unpauseDag", { dagName:
dag.dag_display_name })}
- </Checkbox>
+ {eventType === "materialize" && dag !== undefined && upstreamDagId
!== undefined ? (
+ <TriggerDAGForm
+ dagDisplayName={dag.dag_display_name}
+ dagId={upstreamDagId}
+ error={materializeError}
+ hasSchedule={dag.timetable_summary !== null}
+ isPartitioned={dag.timetable_partitioned}
+ isPaused={dag.is_paused}
+ isPending={isMaterializePending}
+ onSubmitTrigger={handleMaterializeSubmit}
+ open={open}
+ />
) : undefined}
- <ErrorAlert error={eventType === "manual" ? manualError :
materializeError} />
</Dialog.Body>
- <Dialog.Footer>
- <Button
- colorPalette="brand"
- disabled={Boolean(extraError)}
- loading={isPending || isMaterializePending}
- onClick={handleSubmit}
- >
- <FiPlay /> {translate("createEvent.button")}
- </Button>
- </Dialog.Footer>
+ {eventType === "manual" ? (
+ <Dialog.Footer>
+ <Button
+ colorPalette="brand"
+ disabled={Boolean(extraError)}
+ loading={isManualPending}
+ onClick={handleManualSubmit}
+ >
+ <FiPlay /> {translate("createEvent.button")}
+ </Button>
+ </Dialog.Footer>
+ ) : undefined}
</Dialog.Content>
</Dialog.Root>
);
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
index 7912e995d8e..749d94ac877 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
@@ -1408,6 +1408,59 @@ class TestPostAssetMaterialize(TestAssets):
"note": None,
}
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_should_respond_200_with_partition_key(self, test_client):
+ partition_key = "2026-03-23"
+ response = test_client.post("/assets/1/materialize",
json={"partition_key": partition_key})
+ assert response.status_code == 200
+ assert response.json()["partition_key"] == partition_key
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_should_respond_200_with_trigger_fields(self, test_client):
+ payload = {
+ "conf": {"foo": "bar"},
+ "dag_run_id": "asset_materialization_run_1",
+ "data_interval_end": "2026-03-24T00:00:00Z",
+ "data_interval_start": "2026-03-23T00:00:00Z",
+ "logical_date": "2026-03-23T00:00:00Z",
+ "note": "created from asset page",
+ "partition_key": "2026-03-23",
+ }
+ response = test_client.post("/assets/1/materialize", json=payload)
+
+ assert response.status_code == 200
+ assert response.json()["conf"] == {"foo": "bar"}
+ assert response.json()["dag_run_id"] == "asset_materialization_run_1"
+ assert response.json()["data_interval_start"] == "2026-03-23T00:00:00Z"
+ assert response.json()["data_interval_end"] == "2026-03-24T00:00:00Z"
+ assert response.json()["logical_date"] == "2026-03-23T00:00:00Z"
+ assert response.json()["note"] == "created from asset page"
+ assert response.json()["partition_key"] == "2026-03-23"
+ assert response.json()["run_type"] == "asset_materialization"
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_should_respond_200_with_trigger_fields_without_dag_run_id(self,
test_client):
+ payload = {
+ "conf": {"foo": "bar"},
+ # "dag_run_id": "asset_materialization_run_1",
+ "data_interval_end": "2026-03-24T00:00:00Z",
+ "data_interval_start": "2026-03-23T00:00:00Z",
+ "logical_date": "2026-03-23T00:00:00Z",
+ "note": "created from asset page",
+ "partition_key": "2026-03-23",
+ }
+ response = test_client.post("/assets/1/materialize", json=payload)
+
+ assert response.status_code == 200
+ assert response.json()["conf"] == {"foo": "bar"}
+ assert
response.json()["dag_run_id"].startswith("asset_materialization__")
+ assert response.json()["data_interval_start"] == "2026-03-23T00:00:00Z"
+ assert response.json()["data_interval_end"] == "2026-03-24T00:00:00Z"
+ assert response.json()["logical_date"] == "2026-03-23T00:00:00Z"
+ assert response.json()["note"] == "created from asset page"
+ assert response.json()["partition_key"] == "2026-03-23"
+ assert response.json()["run_type"] == "asset_materialization"
+
def test_should_respond_401(self, unauthenticated_test_client):
response = unauthenticated_test_client.post("/assets/2/materialize")
assert response.status_code == 401
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 1fe38713703..17aa78431a0 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -621,6 +621,24 @@ class LastAssetEventResponse(BaseModel):
timestamp: Annotated[datetime | None, Field(title="Timestamp")] = None
+class MaterializeAssetBody(BaseModel):
+ """
+ Materialize asset request.
+ """
+
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ dag_run_id: Annotated[str | None, Field(title="Dag Run Id")] = None
+ data_interval_start: Annotated[datetime | None, Field(title="Data Interval
Start")] = None
+ data_interval_end: Annotated[datetime | None, Field(title="Data Interval
End")] = None
+ logical_date: Annotated[datetime | None, Field(title="Logical Date")] =
None
+ run_after: Annotated[datetime | None, Field(title="Run After")] = None
+ conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None
+ note: Annotated[str | None, Field(title="Note")] = None
+ partition_key: Annotated[str | None, Field(title="Partition Key")] = None
+
+
class PluginImportErrorResponse(BaseModel):
"""
Plugin Import Error serializer for responses.