This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b31231445c4 fix: use Dag form when materializing asset (#64211)
b31231445c4 is described below

commit b31231445c479ba93fb451bbdd85fbc90a1795d4
Author: Wei Lee <[email protected]>
AuthorDate: Mon Mar 30 22:17:03 2026 +0800

    fix: use Dag form when materializing asset (#64211)
    
    * fix: use Dag form when materializing asset
    
    * fixup! fix: use Dag form when materializing asset
    
    * fixup! fixup! fix: use Dag form when materializing asset
    
    * fixup! fixup! fixup! fix: use Dag form when materializing asset
    
    * fixup! fixup! fixup! fixup! fix: use Dag form when materializing asset
    
    * test: add test case 
test_should_respond_200_with_trigger_fields_without_dag_run_id
    
    * fix: restore useTrigger in TriggerDAGModal after TriggerDAGForm refactor
    
    TriggerDAGForm no longer owns the trigger mutation after the refactor
    (onSubmitTrigger was made a prop). TriggerDAGModal must now supply
    useTrigger and pass error/isPending/onSubmitTrigger down to the form,
    otherwise clicking Trigger does nothing and dagRunId stays null.
---
 .../api_fastapi/core_api/datamodels/assets.py      |  27 ++++-
 .../core_api/openapi/v2-rest-api-generated.yaml    |  59 ++++++++++
 .../api_fastapi/core_api/routes/public/assets.py   |  16 +--
 .../src/airflow/ui/openapi-gen/queries/queries.ts  |   7 +-
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts | 102 ++++++++++++++++++
 .../ui/openapi-gen/requests/services.gen.ts        |   3 +
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  17 +++
 .../src/components/TriggerDag/TriggerDAGForm.tsx   |  37 +++----
 .../src/components/TriggerDag/TriggerDAGModal.tsx  |   6 +-
 .../ui/src/pages/Asset/CreateAssetEventModal.tsx   | 119 ++++++++++++---------
 .../core_api/routes/public/test_assets.py          |  53 +++++++++
 .../src/airflowctl/api/datamodels/generated.py     |  18 ++++
 12 files changed, 382 insertions(+), 82 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
index c6cb2fa2827..041ec12c1f2 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -19,11 +19,18 @@ from __future__ import annotations
 
 from collections.abc import Iterable
 from datetime import datetime
+from typing import TYPE_CHECKING
 
-from pydantic import AliasPath, ConfigDict, Field, JsonValue, NonNegativeInt, 
field_validator
+from pydantic import AliasPath, AwareDatetime, ConfigDict, Field, JsonValue, 
NonNegativeInt, field_validator
 
 from airflow._shared.secrets_masker import redact
+from airflow._shared.timezones import timezone
 from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
+from airflow.api_fastapi.core_api.datamodels.dag_run import 
TriggerDAGRunPostBody
+from airflow.utils.types import DagRunType
+
+if TYPE_CHECKING:
+    from airflow.serialization.definitions.dag import SerializedDAG
 
 
 class DagScheduleAssetReference(StrictBaseModel):
@@ -185,3 +192,21 @@ class CreateAssetEventsBody(StrictBaseModel):
         return v
 
     model_config = ConfigDict(extra="forbid")
+
+
+class MaterializeAssetBody(TriggerDAGRunPostBody):
+    """Materialize asset request."""
+
+    logical_date: AwareDatetime | None = None
+
+    def validate_context(self, dag: SerializedDAG) -> dict:
+        params = super().validate_context(dag)
+        if self.dag_run_id is None:
+            params["run_id"] = dag.timetable.generate_run_id(
+                run_type=DagRunType.ASSET_MATERIALIZATION,
+                run_after=timezone.coerce_datetime(params["run_after"]),
+                data_interval=params["data_interval"],
+            )
+        return params
+
+    model_config = ConfigDict(extra="forbid")
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 128d0f2bd05..b167e463a35 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -484,6 +484,14 @@ paths:
         schema:
           type: integer
           title: Asset Id
+      requestBody:
+        content:
+          application/json:
+            schema:
+              anyOf:
+              - $ref: '#/components/schemas/MaterializeAssetBody'
+              - type: 'null'
+              title: Body
       responses:
         '200':
           description: Successful Response
@@ -11949,6 +11957,57 @@ components:
       type: object
       title: LastAssetEventResponse
       description: Last asset event response serializer.
+    MaterializeAssetBody:
+      properties:
+        dag_run_id:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Dag Run Id
+        data_interval_start:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: Data Interval Start
+        data_interval_end:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: Data Interval End
+        logical_date:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: Logical Date
+        run_after:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: Run After
+        conf:
+          anyOf:
+          - additionalProperties: true
+            type: object
+          - type: 'null'
+          title: Conf
+        note:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Note
+        partition_key:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Partition Key
+      additionalProperties: false
+      type: object
+      title: MaterializeAssetBody
+      description: Materialize asset request.
     PatchTaskInstanceBody:
       properties:
         new_state:
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
index 68220c20e8f..a911984dc65 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -54,6 +54,7 @@ from airflow.api_fastapi.core_api.datamodels.assets import (
     AssetEventResponse,
     AssetResponse,
     CreateAssetEventsBody,
+    MaterializeAssetBody,
     QueuedEventCollectionResponse,
     QueuedEventResponse,
 )
@@ -387,6 +388,7 @@ def materialize_asset(
     dag_bag: DagBagDep,
     user: GetUserDep,
     session: SessionDep,
+    body: MaterializeAssetBody | None = None,
 ) -> DAGRunResponse:
     """Materialize an asset by triggering a DAG run that produces it."""
     dag_id_it = iter(
@@ -425,17 +427,19 @@ def materialize_asset(
             f"Dag with dag_id: '{dag_id}' does not allow asset materialization 
runs",
         )
 
+    params = (body or MaterializeAssetBody()).validate_context(dag)
     return dag.create_dagrun(
-        run_id=dag.timetable.generate_run_id(
-            run_type=DagRunType.ASSET_MATERIALIZATION,
-            run_after=(run_after := 
timezone.coerce_datetime(timezone.utcnow())),
-            data_interval=None,
-        ),
-        run_after=run_after,
+        run_id=params["run_id"],
+        logical_date=params["logical_date"],
+        data_interval=params["data_interval"],
+        run_after=params["run_after"],
+        conf=params["conf"],
         run_type=DagRunType.ASSET_MATERIALIZATION,
         triggered_by=DagRunTriggeredByType.REST_API,
         triggering_user_name=user.get_name(),
         state=DagRunState.QUEUED,
+        partition_key=params["partition_key"],
+        note=params["note"],
         session=session,
     )
 
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 8e9ef5aa29d..dac7a198e59 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -2,7 +2,7 @@
 
 import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from 
"@tanstack/react-query";
 import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, 
DagSourceService, DagStatsService, DagVersionService, DagWarningService, 
DashboardService, DeadlinesService, DependenciesService, EventLogService, 
ExperimentalService, ExtraLinksService, GanttService, GridService, 
ImportErrorService, JobService, LoginService, MonitorService, 
PartitionedDagRunService, PluginService, PoolService, Provide [...]
-import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, 
BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, 
ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, 
DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, 
DagWarningType, GenerateTokenBody, PatchTaskInstanceBody, PoolBody, 
PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, 
UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from 
"../requests/t [...]
+import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, 
BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, 
ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, 
DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, 
DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, 
PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, 
UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody [...]
 import * as Common from "./common";
 /**
 * Get Assets
@@ -1765,14 +1765,17 @@ export const useAssetServiceCreateAssetEvent = <TData = 
Common.AssetServiceCreat
 * Materialize an asset by triggering a DAG run that produces it.
 * @param data The data for the request.
 * @param data.assetId
+* @param data.requestBody
 * @returns DAGRunResponse Successful Response
 * @throws ApiError
 */
 export const useAssetServiceMaterializeAsset = <TData = 
Common.AssetServiceMaterializeAssetMutationResult, TError = unknown, TContext = 
unknown>(options?: Omit<UseMutationOptions<TData, TError, {
   assetId: number;
+  requestBody?: MaterializeAssetBody;
 }, TContext>, "mutationFn">) => useMutation<TData, TError, {
   assetId: number;
-}, TContext>({ mutationFn: ({ assetId }) => AssetService.materializeAsset({ 
assetId }) as unknown as Promise<TData>, ...options });
+  requestBody?: MaterializeAssetBody;
+}, TContext>({ mutationFn: ({ assetId, requestBody }) => 
AssetService.materializeAsset({ assetId, requestBody }) as unknown as 
Promise<TData>, ...options });
 /**
 * Create Backfill
 * @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index e17c16e70e4..c5077028b44 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -4473,6 +4473,108 @@ export const $LastAssetEventResponse = {
     description: 'Last asset event response serializer.'
 } as const;
 
+export const $MaterializeAssetBody = {
+    properties: {
+        dag_run_id: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Dag Run Id'
+        },
+        data_interval_start: {
+            anyOf: [
+                {
+                    type: 'string',
+                    format: 'date-time'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Data Interval Start'
+        },
+        data_interval_end: {
+            anyOf: [
+                {
+                    type: 'string',
+                    format: 'date-time'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Data Interval End'
+        },
+        logical_date: {
+            anyOf: [
+                {
+                    type: 'string',
+                    format: 'date-time'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Logical Date'
+        },
+        run_after: {
+            anyOf: [
+                {
+                    type: 'string',
+                    format: 'date-time'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Run After'
+        },
+        conf: {
+            anyOf: [
+                {
+                    additionalProperties: true,
+                    type: 'object'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Conf'
+        },
+        note: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Note'
+        },
+        partition_key: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Partition Key'
+        }
+    },
+    additionalProperties: false,
+    type: 'object',
+    title: 'MaterializeAssetBody',
+    description: 'Materialize asset request.'
+} as const;
+
 export const $PatchTaskInstanceBody = {
     properties: {
         new_state: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index 6e701bf68dc..6f0c2af82fb 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -172,6 +172,7 @@ export class AssetService {
      * Materialize an asset by triggering a DAG run that produces it.
      * @param data The data for the request.
      * @param data.assetId
+     * @param data.requestBody
      * @returns DAGRunResponse Successful Response
      * @throws ApiError
      */
@@ -182,6 +183,8 @@ export class AssetService {
             path: {
                 asset_id: data.assetId
             },
+            body: data.requestBody,
+            mediaType: 'application/json',
             errors: {
                 400: 'Bad Request',
                 401: 'Unauthorized',
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 1bf8cd21fb1..521643cf7f0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1134,6 +1134,22 @@ export type LastAssetEventResponse = {
     timestamp?: string | null;
 };
 
+/**
+ * Materialize asset request.
+ */
+export type MaterializeAssetBody = {
+    dag_run_id?: string | null;
+    data_interval_start?: string | null;
+    data_interval_end?: string | null;
+    logical_date?: string | null;
+    run_after?: string | null;
+    conf?: {
+    [key: string]: unknown;
+} | null;
+    note?: string | null;
+    partition_key?: string | null;
+};
+
 /**
  * Request body for Clear Task Instances endpoint.
  */
@@ -2305,6 +2321,7 @@ export type CreateAssetEventResponse = AssetEventResponse;
 
 export type MaterializeAssetData = {
     assetId: number;
+    requestBody?: MaterializeAssetBody | null;
 };
 
 export type MaterializeAssetResponse = DAGRunResponse;
diff --git 
a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx 
b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
index d94991378e7..91b2cd1daa0 100644
--- a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
+++ b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx
@@ -26,7 +26,6 @@ import { FiPlay } from "react-icons/fi";
 import { useDagParams } from "src/queries/useDagParams";
 import { useParamStore } from "src/queries/useParamStore";
 import { useTogglePause } from "src/queries/useTogglePause";
-import { useTrigger } from "src/queries/useTrigger";
 import { DEFAULT_DATETIME_FORMAT } from "src/utils/datetimeUtils";
 
 import ConfigForm from "../ConfigForm";
@@ -35,16 +34,17 @@ import { ErrorAlert, type ExpandedApiError } from 
"../ErrorAlert";
 import { Checkbox } from "../ui/Checkbox";
 import { RadioCardItem, RadioCardRoot } from "../ui/RadioCard";
 import TriggerDAGAdvancedOptions from "./TriggerDAGAdvancedOptions";
-import type { DagRunTriggerParams } from "./types";
-import { dataIntervalModeOptions } from "./types";
+import { dataIntervalModeOptions, type DagRunTriggerParams } from "./types";
 
 type TriggerDAGFormProps = {
   readonly dagDisplayName: string;
   readonly dagId: string;
+  readonly error?: unknown;
   readonly hasSchedule: boolean;
   readonly isPartitioned: boolean;
   readonly isPaused: boolean;
-  readonly onClose: () => void;
+  readonly isPending?: boolean;
+  readonly onSubmitTrigger?: (params: DagRunTriggerParams) => void;
   readonly open: boolean;
   readonly prefillConfig?:
     | {
@@ -58,10 +58,12 @@ type TriggerDAGFormProps = {
 const TriggerDAGForm = ({
   dagDisplayName,
   dagId,
+  error,
   hasSchedule,
   isPartitioned,
   isPaused,
-  onClose,
+  isPending = false,
+  onSubmitTrigger,
   open,
   prefillConfig,
 }: TriggerDAGFormProps) => {
@@ -69,10 +71,8 @@ const TriggerDAGForm = ({
   const [errors, setErrors] = useState<{ conf?: string; date?: unknown }>({});
   const [formError, setFormError] = useState(false);
   const initialParamsDict = useDagParams(dagId, open);
-  const { error: errorTrigger, isPending, triggerDagRun } = useTrigger({ 
dagId, onSuccessConfirm: onClose });
   const { conf, initialParamDict, setConf, setInitialParamDict } = 
useParamStore();
   const [unpause, setUnpause] = useState(true);
-
   const { mutate: togglePause } = useTogglePause({ dagId });
 
   const { control, handleSubmit, reset, watch } = 
useForm<DagRunTriggerParams>({
@@ -106,7 +106,6 @@ const TriggerDAGForm = ({
         note: "",
         partitionKey: undefined,
       });
-
       // Also update the param store to keep it in sync.
       // Wait until we have the initial params so section ordering stays 
consistent.
       if (confString && Object.keys(initialParamsDict.paramsDict).length > 0) {
@@ -130,16 +129,11 @@ const TriggerDAGForm = ({
   // Automatically reset form when conf is fetched (only if no prefillConfig)
   useEffect(() => {
     if (conf && !prefillConfig && open) {
-      reset((prevValues) => ({
-        ...prevValues,
-        conf,
-      }));
+      reset((prevValues) => ({ ...prevValues, conf }));
     }
   }, [conf, prefillConfig, open, reset]);
 
-  const resetDateError = () => {
-    setErrors((prev) => ({ ...prev, date: undefined }));
-  };
+  const resetDateError = () => setErrors((prev) => ({ ...prev, date: undefined 
}));
 
   const dataIntervalMode = watch("dataIntervalMode");
   const dataIntervalStart = watch("dataIntervalStart");
@@ -150,19 +144,14 @@ const TriggerDAGForm = ({
     (noDataInterval || 
dayjs(dataIntervalStart).isAfter(dayjs(dataIntervalEnd)));
   const onSubmit = (data: DagRunTriggerParams) => {
     if (unpause && isPaused) {
-      togglePause({
-        dagId,
-        requestBody: {
-          is_paused: false,
-        },
-      });
+      togglePause({ dagId, requestBody: { is_paused: false } });
     }
-    triggerDagRun(data);
+    onSubmitTrigger?.(data);
   };
 
   return (
     <>
-      <ErrorAlert error={errors.date ?? errorTrigger} />
+      <ErrorAlert error={errors.date ?? error} />
       <VStack alignItems="stretch" gap={2} pt={4}>
         {isPartitioned ? undefined : (
           <>
@@ -272,7 +261,7 @@ const TriggerDAGForm = ({
               formError ||
               isPending ||
               dataIntervalInvalid ||
-              (Boolean(errorTrigger) && (errorTrigger as 
ExpandedApiError).status === 403)
+              (Boolean(error) && (error as ExpandedApiError).status === 403)
             }
             onClick={() => void handleSubmit(onSubmit)()}
           >
diff --git 
a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx 
b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx
index 840d15626f9..07080ac0d33 100644
--- a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx
+++ b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx
@@ -23,6 +23,7 @@ import { useTranslation } from "react-i18next";
 import { useDagServiceGetDag } from "openapi/queries";
 import { Dialog, Tooltip } from "src/components/ui";
 import { RadioCardItem, RadioCardRoot } from "src/components/ui/RadioCard";
+import { useTrigger } from "src/queries/useTrigger";
 
 import RunBackfillForm from "../DagActions/RunBackfillForm";
 import TriggerDAGForm from "./TriggerDAGForm";
@@ -73,6 +74,7 @@ const TriggerDAGModal: React.FC<TriggerDAGModalProps> = ({
 
   const hasSchedule = dag?.timetable_summary !== null;
   const isPartitioned = dag ? dag.timetable_partitioned : false;
+  const { error, isPending, triggerDagRun } = useTrigger({ dagId, 
onSuccessConfirm: onClose });
   const maxDisplayLength = 59; // hard-coded length to prevent dag name 
overflowing the modal
   const nameOverflowing = dagDisplayName.length > maxDisplayLength;
 
@@ -134,10 +136,12 @@ const TriggerDAGModal: React.FC<TriggerDAGModalProps> = ({
                 <TriggerDAGForm
                   dagDisplayName={dagDisplayName}
                   dagId={dagId}
+                  error={error}
                   hasSchedule={hasSchedule}
                   isPartitioned={isPartitioned}
                   isPaused={isPaused}
-                  onClose={onClose}
+                  isPending={isPending}
+                  onSubmitTrigger={triggerDagRun}
                   open={open}
                   prefillConfig={prefillConfig}
                 />
diff --git 
a/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx 
b/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
index 472981325bf..6b9d00afad1 100644
--- a/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-import { Button, Field, Heading, HStack, VStack, Text } from 
"@chakra-ui/react";
+import { Button, Field, Heading, HStack, Text, VStack } from 
"@chakra-ui/react";
 import { useQueryClient } from "@tanstack/react-query";
 import { useState } from "react";
 import { useTranslation } from "react-i18next";
@@ -36,14 +36,15 @@ import type {
   AssetEventResponse,
   AssetResponse,
   DAGRunResponse,
+  MaterializeAssetBody,
   EdgeResponse,
 } from "openapi/requests/types.gen";
 import { ErrorAlert } from "src/components/ErrorAlert";
 import { JsonEditor } from "src/components/JsonEditor";
+import TriggerDAGForm from "src/components/TriggerDag/TriggerDAGForm";
+import type { DagRunTriggerParams } from "src/components/TriggerDag/types";
 import { Dialog, toaster } from "src/components/ui";
-import { Checkbox } from "src/components/ui/Checkbox";
 import { RadioCardItem, RadioCardRoot } from "src/components/ui/RadioCard";
-import { useTogglePause } from "src/queries/useTogglePause";
 
 type Props = {
   readonly asset: AssetResponse;
@@ -55,7 +56,6 @@ export const CreateAssetEventModal = ({ asset, onClose, open 
}: Props) => {
   const { t: translate } = useTranslation(["assets", "components"]);
   const [eventType, setEventType] = useState("manual");
   const [extraError, setExtraError] = useState<string | undefined>();
-  const [unpause, setUnpause] = useState(true);
   const [extra, setExtra] = useState("{}");
   const [partitionKey, setPartitionKey] = useState<string | 
undefined>(undefined);
   const queryClient = useQueryClient();
@@ -93,6 +93,7 @@ export const CreateAssetEventModal = ({ asset, onClose, open 
}: Props) => {
   const onSuccess = async (response: AssetEventResponse | DAGRunResponse) => {
     setExtra("{}");
     setExtraError(undefined);
+    setPartitionKey(undefined);
     onClose();
 
     let queryKeys = [UseAssetServiceGetAssetEventsKeyFn({ assetId: asset.id }, 
[{ assetId: asset.id }])];
@@ -127,11 +128,9 @@ export const CreateAssetEventModal = ({ asset, onClose, 
open }: Props) => {
     enabled: Boolean(upstreamDagId),
   });
 
-  const { mutate: togglePause } = useTogglePause({ dagId: dag?.dag_id ?? 
upstreamDagId ?? "" });
-
   const {
     error: manualError,
-    isPending,
+    isPending: isManualPending,
     mutate: createAssetEvent,
   } = useAssetServiceCreateAssetEvent({ onSuccess });
   const {
@@ -142,28 +141,41 @@ export const CreateAssetEventModal = ({ asset, onClose, 
open }: Props) => {
     onSuccess,
   });
 
-  const handleSubmit = () => {
-    if (eventType === "materialize") {
-      if (unpause && dag?.is_paused) {
-        togglePause({
-          dagId: dag.dag_id,
-          requestBody: {
-            is_paused: false,
-          },
-        });
-      }
-      materializeAsset({ assetId: asset.id });
-    } else {
-      createAssetEvent({
-        requestBody: {
-          asset_id: asset.id,
-          extra: JSON.parse(extra) as Record<string, unknown>,
-          partition_key: partitionKey ?? null,
-        },
-      });
-    }
+  const handleMaterializeSubmit = (dagRunRequestBody: DagRunTriggerParams) => {
+    const parsedConfig = JSON.parse(dagRunRequestBody.conf) as Record<string, 
unknown>;
+    const logicalDate = dagRunRequestBody.logicalDate ? new 
Date(dagRunRequestBody.logicalDate) : undefined;
+    const dataIntervalStart = dagRunRequestBody.dataIntervalStart
+      ? new Date(dagRunRequestBody.dataIntervalStart)
+      : undefined;
+    const dataIntervalEnd = dagRunRequestBody.dataIntervalEnd
+      ? new Date(dagRunRequestBody.dataIntervalEnd)
+      : undefined;
+
+    const requestBody: MaterializeAssetBody = {
+      conf: parsedConfig,
+      dag_run_id: dagRunRequestBody.dagRunId === "" ? undefined : 
dagRunRequestBody.dagRunId,
+      data_interval_end: dataIntervalEnd?.toISOString() ?? null,
+      data_interval_start: dataIntervalStart?.toISOString() ?? null,
+      logical_date: logicalDate?.toISOString() ?? null,
+      note: dagRunRequestBody.note === "" ? undefined : dagRunRequestBody.note,
+      partition_key: dagRunRequestBody.partitionKey ?? null,
+    };
+
+    materializeAsset({
+      assetId: asset.id,
+      requestBody,
+    });
   };
 
+  const handleManualSubmit = () =>
+    createAssetEvent({
+      requestBody: {
+        asset_id: asset.id,
+        extra: JSON.parse(extra) as Record<string, unknown>,
+        partition_key: partitionKey ?? null,
+      },
+    });
+
   return (
     <Dialog.Root lazyMount onOpenChange={onClose} open={open} size="xl" 
unmountOnExit>
       <Dialog.Content backdrop>
@@ -203,37 +215,48 @@ export const CreateAssetEventModal = ({ asset, onClose, 
open }: Props) => {
               />
             </HStack>
           </RadioCardRoot>
+          {eventType === "manual" ? (
+            <Field.Root mt={6}>
+              <Field.Label 
fontSize="md">{translate("createEvent.manual.extra")}</Field.Label>
+              <JsonEditor onChange={validateAndPrettifyJson} value={extra} />
+              <Text color="fg.error">{extraError}</Text>
+            </Field.Root>
+          ) : undefined}
           {eventType === "manual" ? (
             <>
-              <Field.Root mt={6}>
-                <Field.Label 
fontSize="md">{translate("createEvent.manual.extra")}</Field.Label>
-                <JsonEditor onChange={validateAndPrettifyJson} value={extra} />
-                <Text color="fg.error">{extraError}</Text>
-              </Field.Root>
               <Field.Root mt={6}>
                 <Field.Label 
fontSize="md">{translate("common:dagRun.partitionKey")}</Field.Label>
                 <JsonEditor onChange={setPartitionKey} value={partitionKey} />
-                <Text color="fg.error">{extraError}</Text>
               </Field.Root>
+              <ErrorAlert error={manualError} />
             </>
           ) : undefined}
-          {eventType === "materialize" && dag?.is_paused ? (
-            <Checkbox checked={unpause} colorPalette="brand" onChange={() => 
setUnpause(!unpause)}>
-              {translate("createEvent.materialize.unpauseDag", { dagName: 
dag.dag_display_name })}
-            </Checkbox>
+          {eventType === "materialize" && dag !== undefined && upstreamDagId 
!== undefined ? (
+            <TriggerDAGForm
+              dagDisplayName={dag.dag_display_name}
+              dagId={upstreamDagId}
+              error={materializeError}
+              hasSchedule={dag.timetable_summary !== null}
+              isPartitioned={dag.timetable_partitioned}
+              isPaused={dag.is_paused}
+              isPending={isMaterializePending}
+              onSubmitTrigger={handleMaterializeSubmit}
+              open={open}
+            />
           ) : undefined}
-          <ErrorAlert error={eventType === "manual" ? manualError : 
materializeError} />
         </Dialog.Body>
-        <Dialog.Footer>
-          <Button
-            colorPalette="brand"
-            disabled={Boolean(extraError)}
-            loading={isPending || isMaterializePending}
-            onClick={handleSubmit}
-          >
-            <FiPlay /> {translate("createEvent.button")}
-          </Button>
-        </Dialog.Footer>
+        {eventType === "manual" ? (
+          <Dialog.Footer>
+            <Button
+              colorPalette="brand"
+              disabled={Boolean(extraError)}
+              loading={isManualPending}
+              onClick={handleManualSubmit}
+            >
+              <FiPlay /> {translate("createEvent.button")}
+            </Button>
+          </Dialog.Footer>
+        ) : undefined}
       </Dialog.Content>
     </Dialog.Root>
   );
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
index 7912e995d8e..749d94ac877 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
@@ -1408,6 +1408,59 @@ class TestPostAssetMaterialize(TestAssets):
             "note": None,
         }
 
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_should_respond_200_with_partition_key(self, test_client):
+        partition_key = "2026-03-23"
+        response = test_client.post("/assets/1/materialize", 
json={"partition_key": partition_key})
+        assert response.status_code == 200
+        assert response.json()["partition_key"] == partition_key
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_should_respond_200_with_trigger_fields(self, test_client):
+        payload = {
+            "conf": {"foo": "bar"},
+            "dag_run_id": "asset_materialization_run_1",
+            "data_interval_end": "2026-03-24T00:00:00Z",
+            "data_interval_start": "2026-03-23T00:00:00Z",
+            "logical_date": "2026-03-23T00:00:00Z",
+            "note": "created from asset page",
+            "partition_key": "2026-03-23",
+        }
+        response = test_client.post("/assets/1/materialize", json=payload)
+
+        assert response.status_code == 200
+        assert response.json()["conf"] == {"foo": "bar"}
+        assert response.json()["dag_run_id"] == "asset_materialization_run_1"
+        assert response.json()["data_interval_start"] == "2026-03-23T00:00:00Z"
+        assert response.json()["data_interval_end"] == "2026-03-24T00:00:00Z"
+        assert response.json()["logical_date"] == "2026-03-23T00:00:00Z"
+        assert response.json()["note"] == "created from asset page"
+        assert response.json()["partition_key"] == "2026-03-23"
+        assert response.json()["run_type"] == "asset_materialization"
+
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_should_respond_200_with_trigger_fields_without_dag_run_id(self, 
test_client):
+        payload = {
+            "conf": {"foo": "bar"},
+            # "dag_run_id": "asset_materialization_run_1",
+            "data_interval_end": "2026-03-24T00:00:00Z",
+            "data_interval_start": "2026-03-23T00:00:00Z",
+            "logical_date": "2026-03-23T00:00:00Z",
+            "note": "created from asset page",
+            "partition_key": "2026-03-23",
+        }
+        response = test_client.post("/assets/1/materialize", json=payload)
+
+        assert response.status_code == 200
+        assert response.json()["conf"] == {"foo": "bar"}
+        assert 
response.json()["dag_run_id"].startswith("asset_materialization__")
+        assert response.json()["data_interval_start"] == "2026-03-23T00:00:00Z"
+        assert response.json()["data_interval_end"] == "2026-03-24T00:00:00Z"
+        assert response.json()["logical_date"] == "2026-03-23T00:00:00Z"
+        assert response.json()["note"] == "created from asset page"
+        assert response.json()["partition_key"] == "2026-03-23"
+        assert response.json()["run_type"] == "asset_materialization"
+
     def test_should_respond_401(self, unauthenticated_test_client):
         response = unauthenticated_test_client.post("/assets/2/materialize")
         assert response.status_code == 401
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 1fe38713703..17aa78431a0 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -621,6 +621,24 @@ class LastAssetEventResponse(BaseModel):
     timestamp: Annotated[datetime | None, Field(title="Timestamp")] = None
 
 
+class MaterializeAssetBody(BaseModel):
+    """
+    Materialize asset request.
+    """
+
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    dag_run_id: Annotated[str | None, Field(title="Dag Run Id")] = None
+    data_interval_start: Annotated[datetime | None, Field(title="Data Interval 
Start")] = None
+    data_interval_end: Annotated[datetime | None, Field(title="Data Interval 
End")] = None
+    logical_date: Annotated[datetime | None, Field(title="Logical Date")] = 
None
+    run_after: Annotated[datetime | None, Field(title="Run After")] = None
+    conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None
+    note: Annotated[str | None, Field(title="Note")] = None
+    partition_key: Annotated[str | None, Field(title="Partition Key")] = None
+
+
 class PluginImportErrorResponse(BaseModel):
     """
     Plugin Import Error serializer for responses.


Reply via email to