This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d5b9d630c4d Add REST API endpoint to materialize asset (#46718)
d5b9d630c4d is described below

commit d5b9d630c4d10267e7ead291da8850deece3ef9f
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Mon Feb 17 14:33:26 2025 +0800

    Add REST API endpoint to materialize asset (#46718)
    
    This is done by triggering the DAG that has the given asset as a task
    outlet. 409 is returned if more than one DAG uses the asset as outlet.
---
 .../api_fastapi/core_api/openapi/v1-generated.yaml | 51 +++++++++++++++++
 .../api_fastapi/core_api/routes/public/assets.py   | 64 +++++++++++++++++++++-
 airflow/ui/openapi-gen/queries/common.ts           |  3 +
 airflow/ui/openapi-gen/queries/queries.ts          | 36 ++++++++++++
 airflow/ui/openapi-gen/requests/services.gen.ts    | 27 +++++++++
 airflow/ui/openapi-gen/requests/types.gen.ts       | 37 +++++++++++++
 .../core_api/routes/public/test_assets.py          | 63 +++++++++++++++++++--
 7 files changed, 275 insertions(+), 6 deletions(-)

diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 6394e438472..4aceab68a74 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -849,6 +849,57 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /public/assets/{asset_id}/materialize:
+    post:
+      tags:
+      - Asset
+      summary: Materialize Asset
+      description: Materialize an asset by triggering a DAG run that produces 
it.
+      operationId: materialize_asset
+      parameters:
+      - name: asset_id
+        in: path
+        required: true
+        schema:
+          type: integer
+          title: Asset Id
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DAGRunResponse'
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '409':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Conflict
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /public/assets/{asset_id}/queuedEvents:
     get:
       tags:
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow/api_fastapi/core_api/routes/public/assets.py
index 40e2fc5ce6c..33511028299 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 from datetime import datetime
 from typing import Annotated
 
-from fastapi import Depends, HTTPException, status
+from fastapi import Depends, HTTPException, Request, status
 from sqlalchemy import delete, select
 from sqlalchemy.orm import joinedload, subqueryload
 
@@ -51,10 +51,22 @@ from airflow.api_fastapi.core_api.datamodels.assets import (
     QueuedEventCollectionResponse,
     QueuedEventResponse,
 )
+from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.assets.manager import asset_manager
-from airflow.models.asset import AssetAliasModel, AssetDagRunQueue, 
AssetEvent, AssetModel
+from airflow.models.asset import (
+    AssetAliasModel,
+    AssetDagRunQueue,
+    AssetEvent,
+    AssetModel,
+    TaskOutletAssetReference,
+)
+from airflow.models.dag import DAG
+from airflow.models.dag_version import DagVersion
 from airflow.utils import timezone
+from airflow.utils.state import DagRunState
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 assets_router = AirflowRouter(tags=["Asset"])
 
@@ -243,6 +255,54 @@ def create_asset_event(
     return AssetEventResponse.model_validate(assets_event)
 
 
+@assets_router.post(
+    "/assets/{asset_id}/materialize",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, 
status.HTTP_409_CONFLICT]),
+    dependencies=[Depends(action_logging())],
+)
+def materialize_asset(
+    asset_id: int,
+    request: Request,
+    session: SessionDep,
+) -> DAGRunResponse:
+    """Materialize an asset by triggering a DAG run that produces it."""
+    dag_id_it = iter(
+        session.scalars(
+            select(TaskOutletAssetReference.dag_id)
+            .where(TaskOutletAssetReference.asset_id == asset_id)
+            .group_by(TaskOutletAssetReference.dag_id)
+            .limit(2)
+        )
+    )
+
+    if (dag_id := next(dag_id_it, None)) is None:
+        raise HTTPException(status.HTTP_404_NOT_FOUND, f"No DAG materializes 
asset with ID: {asset_id}")
+    if next(dag_id_it, None) is not None:
+        raise HTTPException(
+            status.HTTP_409_CONFLICT,
+            f"More than one DAG materializes asset with ID: {asset_id}",
+        )
+
+    dag: DAG | None
+    if not (dag := request.app.state.dag_bag.get_dag(dag_id)):
+        raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with ID 
`{dag_id}` was not found")
+
+    return dag.create_dagrun(
+        run_id=dag.timetable.generate_run_id(
+            run_type=DagRunType.MANUAL,
+            run_after=(run_after := 
timezone.coerce_datetime(timezone.utcnow())),
+            data_interval=None,
+        ),
+        run_after=run_after,
+        run_type=DagRunType.MANUAL,
+        triggered_by=DagRunTriggeredByType.REST_API,
+        external_trigger=True,
+        dag_version=DagVersion.get_latest_version(dag_id, session=session),
+        state=DagRunState.QUEUED,
+        session=session,
+    )
+
+
 @assets_router.get(
     "/assets/{asset_id}/queuedEvents",
     responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index 8bc70e1fb57..9beff4de51e 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -1704,6 +1704,9 @@ export const UseLoginServiceLoginKeyFn = (
 export type AssetServiceCreateAssetEventMutationResult = Awaited<
   ReturnType<typeof AssetService.createAssetEvent>
 >;
+export type AssetServiceMaterializeAssetMutationResult = Awaited<
+  ReturnType<typeof AssetService.materializeAsset>
+>;
 export type BackfillServiceCreateBackfillMutationResult = Awaited<
   ReturnType<typeof BackfillService.createBackfill>
 >;
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index 999a5f744b0..acacb68dc07 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -2870,6 +2870,42 @@ export const useAssetServiceCreateAssetEvent = <
       AssetService.createAssetEvent({ requestBody }) as unknown as 
Promise<TData>,
     ...options,
   });
+/**
+ * Materialize Asset
+ * Materialize an asset by triggering a DAG run that produces it.
+ * @param data The data for the request.
+ * @param data.assetId
+ * @returns DAGRunResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceMaterializeAsset = <
+  TData = Common.AssetServiceMaterializeAssetMutationResult,
+  TError = unknown,
+  TContext = unknown,
+>(
+  options?: Omit<
+    UseMutationOptions<
+      TData,
+      TError,
+      {
+        assetId: number;
+      },
+      TContext
+    >,
+    "mutationFn"
+  >,
+) =>
+  useMutation<
+    TData,
+    TError,
+    {
+      assetId: number;
+    },
+    TContext
+  >({
+    mutationFn: ({ assetId }) => AssetService.materializeAsset({ assetId }) as 
unknown as Promise<TData>,
+    ...options,
+  });
 /**
  * Create Backfill
  * @param data The data for the request.
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index ac82c3896e3..eb1cc49672e 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -15,6 +15,8 @@ import type {
   GetAssetEventsResponse,
   CreateAssetEventData,
   CreateAssetEventResponse,
+  MaterializeAssetData,
+  MaterializeAssetResponse,
   GetAssetQueuedEventsData,
   GetAssetQueuedEventsResponse,
   DeleteAssetQueuedEventsData,
@@ -381,6 +383,31 @@ export class AssetService {
     });
   }
 
+  /**
+   * Materialize Asset
+   * Materialize an asset by triggering a DAG run that produces it.
+   * @param data The data for the request.
+   * @param data.assetId
+   * @returns DAGRunResponse Successful Response
+   * @throws ApiError
+   */
+  public static materializeAsset(data: MaterializeAssetData): 
CancelablePromise<MaterializeAssetResponse> {
+    return __request(OpenAPI, {
+      method: "POST",
+      url: "/public/assets/{asset_id}/materialize",
+      path: {
+        asset_id: data.assetId,
+      },
+      errors: {
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        409: "Conflict",
+        422: "Validation Error",
+      },
+    });
+  }
+
   /**
    * Get Asset Queued Events
    * Get queued asset events for an asset.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 192d110860e..c33656c890c 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1621,6 +1621,12 @@ export type CreateAssetEventData = {
 
 export type CreateAssetEventResponse = AssetEventResponse;
 
+export type MaterializeAssetData = {
+  assetId: number;
+};
+
+export type MaterializeAssetResponse = DAGRunResponse;
+
 export type GetAssetQueuedEventsData = {
   assetId: number;
   before?: string | null;
@@ -2613,6 +2619,37 @@ export type $OpenApiTs = {
       };
     };
   };
+  "/public/assets/{asset_id}/materialize": {
+    post: {
+      req: MaterializeAssetData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: DAGRunResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Conflict
+         */
+        409: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
+  };
   "/public/assets/{asset_id}/queuedEvents": {
     get: {
       req: GetAssetQueuedEventsData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py 
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index 540747cff24..9b7c3352a00 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -33,6 +33,7 @@ from airflow.models.asset import (
     TaskOutletAssetReference,
 )
 from airflow.models.dagrun import DagRun
+from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.utils import timezone
 from airflow.utils.session import provide_session
 from airflow.utils.state import DagRunState
@@ -47,7 +48,7 @@ DEFAULT_DATE = datetime(2020, 6, 11, 18, 0, 0, 
tzinfo=timezone.utc)
 pytestmark = pytest.mark.db_test
 
 
-def _create_assets(session, num: int = 2) -> None:
+def _create_assets(session, num: int = 2) -> list[AssetModel]:
     assets = [
         AssetModel(
             id=i,
@@ -62,6 +63,7 @@ def _create_assets(session, num: int = 2) -> None:
     ]
     session.add_all(assets)
     session.commit()
+    return assets
 
 
 def _create_assets_with_sensitive_extra(session, num: int = 2) -> None:
@@ -192,8 +194,8 @@ class TestAssets:
         clear_db_runs()
 
     @provide_session
-    def create_assets(self, session, num: int = 2):
-        _create_assets(session=session, num=num)
+    def create_assets(self, session, num: int = 2) -> list[AssetModel]:
+        return _create_assets(session=session, num=num)
 
     @provide_session
     def create_assets_with_sensitive_extra(self, session, num: int = 2):
@@ -921,7 +923,7 @@ class 
TestDeleteDagDatasetQueuedEvents(TestQueuedEventEndpoint):
 class TestPostAssetEvents(TestAssets):
     @pytest.mark.usefixtures("time_freezer")
     def test_should_respond_200(self, test_client, session):
-        self.create_assets()
+        self.create_assets(session)
         event_payload = {"asset_id": 1, "extra": {"foo": "bar"}}
         response = test_client.post("/public/assets/events", 
json=event_payload)
         assert response.status_code == 200
@@ -970,6 +972,59 @@ class TestPostAssetEvents(TestAssets):
         }
 
 
[email protected]_serialized_dag
+class TestPostAssetMaterialize(TestAssets):
+    DAG_ASSET1_ID = "test_dag_1"
+    DAG_ASSET2_ID_A = "test_dag_2a"
+    DAG_ASSET2_ID_B = "test_dag_2b"
+    DAG_ASSET_NO = "test_dag_no"
+
+    @pytest.fixture(autouse=True)
+    def create_dags(self, setup, dag_maker, session):
+        # Depend on 'setup' so it runs first. Otherwise it deletes what we 
create here.
+        assets = {am.id: am.to_public() for am in 
self.create_assets(session=session, num=3)}
+        with dag_maker(self.DAG_ASSET1_ID, schedule=None, session=session):
+            EmptyOperator(task_id="task", outlets=assets[1])
+        with dag_maker(self.DAG_ASSET2_ID_A, schedule=None, session=session):
+            EmptyOperator(task_id="task", outlets=assets[2])
+        with dag_maker(self.DAG_ASSET2_ID_B, schedule=None, session=session):
+            EmptyOperator(task_id="task", outlets=assets[2])
+        with dag_maker(self.DAG_ASSET_NO, schedule=None, session=session):
+            EmptyOperator(task_id="task")
+
+    def test_should_respond_200(self, test_client):
+        response = test_client.post("/public/assets/1/materialize")
+        assert response.status_code == 200
+        assert response.json() == {
+            "dag_run_id": mock.ANY,
+            "dag_id": self.DAG_ASSET1_ID,
+            "logical_date": None,
+            "queued_at": mock.ANY,
+            "run_after": mock.ANY,
+            "start_date": None,
+            "end_date": None,
+            "data_interval_start": None,
+            "data_interval_end": None,
+            "last_scheduling_decision": None,
+            "run_type": "manual",
+            "state": "queued",
+            "external_trigger": True,
+            "triggered_by": "rest_api",
+            "conf": {},
+            "note": None,
+        }
+
+    def test_should_respond_409_on_multiple_dags(self, test_client):
+        response = test_client.post("/public/assets/2/materialize")
+        assert response.status_code == 409
+        assert response.json()["detail"] == "More than one DAG materializes 
asset with ID: 2"
+
+    def test_should_respond_404_on_multiple_dags(self, test_client):
+        response = test_client.post("/public/assets/3/materialize")
+        assert response.status_code == 404
+        assert response.json()["detail"] == "No DAG materializes asset with 
ID: 3"
+
+
 class TestGetAssetQueuedEvents(TestQueuedEventEndpoint):
     @pytest.mark.usefixtures("time_freezer")
     def test_should_respond_200(self, test_client, session, create_dummy_dag):

Reply via email to