This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d5b9d630c4d Add REST API endpoint to materialize asset (#46718)
d5b9d630c4d is described below
commit d5b9d630c4d10267e7ead291da8850deece3ef9f
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Mon Feb 17 14:33:26 2025 +0800
Add REST API endpoint to materialize asset (#46718)
This is done by triggering the DAG that has the given asset as a task
outlet. 409 is returned if more than one DAG uses the asset as outlet.
---
.../api_fastapi/core_api/openapi/v1-generated.yaml | 51 +++++++++++++++++
.../api_fastapi/core_api/routes/public/assets.py | 64 +++++++++++++++++++++-
airflow/ui/openapi-gen/queries/common.ts | 3 +
airflow/ui/openapi-gen/queries/queries.ts | 36 ++++++++++++
airflow/ui/openapi-gen/requests/services.gen.ts | 27 +++++++++
airflow/ui/openapi-gen/requests/types.gen.ts | 37 +++++++++++++
.../core_api/routes/public/test_assets.py | 63 +++++++++++++++++++--
7 files changed, 275 insertions(+), 6 deletions(-)
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 6394e438472..4aceab68a74 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -849,6 +849,57 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/assets/{asset_id}/materialize:
+ post:
+ tags:
+ - Asset
+ summary: Materialize Asset
+ description: Materialize an asset by triggering a DAG run that produces
it.
+ operationId: materialize_asset
+ parameters:
+ - name: asset_id
+ in: path
+ required: true
+ schema:
+ type: integer
+ title: Asset Id
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DAGRunResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '409':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Conflict
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/assets/{asset_id}/queuedEvents:
get:
tags:
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py
b/airflow/api_fastapi/core_api/routes/public/assets.py
index 40e2fc5ce6c..33511028299 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -20,7 +20,7 @@ from __future__ import annotations
from datetime import datetime
from typing import Annotated
-from fastapi import Depends, HTTPException, status
+from fastapi import Depends, HTTPException, Request, status
from sqlalchemy import delete, select
from sqlalchemy.orm import joinedload, subqueryload
@@ -51,10 +51,22 @@ from airflow.api_fastapi.core_api.datamodels.assets import (
QueuedEventCollectionResponse,
QueuedEventResponse,
)
+from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.api_fastapi.logging.decorators import action_logging
from airflow.assets.manager import asset_manager
-from airflow.models.asset import AssetAliasModel, AssetDagRunQueue,
AssetEvent, AssetModel
+from airflow.models.asset import (
+ AssetAliasModel,
+ AssetDagRunQueue,
+ AssetEvent,
+ AssetModel,
+ TaskOutletAssetReference,
+)
+from airflow.models.dag import DAG
+from airflow.models.dag_version import DagVersion
from airflow.utils import timezone
+from airflow.utils.state import DagRunState
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
assets_router = AirflowRouter(tags=["Asset"])
@@ -243,6 +255,54 @@ def create_asset_event(
return AssetEventResponse.model_validate(assets_event)
+@assets_router.post(
+ "/assets/{asset_id}/materialize",
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT]),
+ dependencies=[Depends(action_logging())],
+)
+def materialize_asset(
+ asset_id: int,
+ request: Request,
+ session: SessionDep,
+) -> DAGRunResponse:
+ """Materialize an asset by triggering a DAG run that produces it."""
+ dag_id_it = iter(
+ session.scalars(
+ select(TaskOutletAssetReference.dag_id)
+ .where(TaskOutletAssetReference.asset_id == asset_id)
+ .group_by(TaskOutletAssetReference.dag_id)
+ .limit(2)
+ )
+ )
+
+ if (dag_id := next(dag_id_it, None)) is None:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"No DAG materializes
asset with ID: {asset_id}")
+ if next(dag_id_it, None) is not None:
+ raise HTTPException(
+ status.HTTP_409_CONFLICT,
+ f"More than one DAG materializes asset with ID: {asset_id}",
+ )
+
+ dag: DAG | None
+ if not (dag := request.app.state.dag_bag.get_dag(dag_id)):
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with ID
`{dag_id}` was not found")
+
+ return dag.create_dagrun(
+ run_id=dag.timetable.generate_run_id(
+ run_type=DagRunType.MANUAL,
+ run_after=(run_after :=
timezone.coerce_datetime(timezone.utcnow())),
+ data_interval=None,
+ ),
+ run_after=run_after,
+ run_type=DagRunType.MANUAL,
+ triggered_by=DagRunTriggeredByType.REST_API,
+ external_trigger=True,
+ dag_version=DagVersion.get_latest_version(dag_id, session=session),
+ state=DagRunState.QUEUED,
+ session=session,
+ )
+
+
@assets_router.get(
"/assets/{asset_id}/queuedEvents",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index 8bc70e1fb57..9beff4de51e 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -1704,6 +1704,9 @@ export const UseLoginServiceLoginKeyFn = (
export type AssetServiceCreateAssetEventMutationResult = Awaited<
ReturnType<typeof AssetService.createAssetEvent>
>;
+export type AssetServiceMaterializeAssetMutationResult = Awaited<
+ ReturnType<typeof AssetService.materializeAsset>
+>;
export type BackfillServiceCreateBackfillMutationResult = Awaited<
ReturnType<typeof BackfillService.createBackfill>
>;
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 999a5f744b0..acacb68dc07 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -2870,6 +2870,42 @@ export const useAssetServiceCreateAssetEvent = <
AssetService.createAssetEvent({ requestBody }) as unknown as
Promise<TData>,
...options,
});
+/**
+ * Materialize Asset
+ * Materialize an asset by triggering a DAG run that produces it.
+ * @param data The data for the request.
+ * @param data.assetId
+ * @returns DAGRunResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceMaterializeAsset = <
+ TData = Common.AssetServiceMaterializeAssetMutationResult,
+ TError = unknown,
+ TContext = unknown,
+>(
+ options?: Omit<
+ UseMutationOptions<
+ TData,
+ TError,
+ {
+ assetId: number;
+ },
+ TContext
+ >,
+ "mutationFn"
+ >,
+) =>
+ useMutation<
+ TData,
+ TError,
+ {
+ assetId: number;
+ },
+ TContext
+ >({
+ mutationFn: ({ assetId }) => AssetService.materializeAsset({ assetId }) as
unknown as Promise<TData>,
+ ...options,
+ });
/**
* Create Backfill
* @param data The data for the request.
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index ac82c3896e3..eb1cc49672e 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -15,6 +15,8 @@ import type {
GetAssetEventsResponse,
CreateAssetEventData,
CreateAssetEventResponse,
+ MaterializeAssetData,
+ MaterializeAssetResponse,
GetAssetQueuedEventsData,
GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData,
@@ -381,6 +383,31 @@ export class AssetService {
});
}
+ /**
+ * Materialize Asset
+ * Materialize an asset by triggering a DAG run that produces it.
+ * @param data The data for the request.
+ * @param data.assetId
+ * @returns DAGRunResponse Successful Response
+ * @throws ApiError
+ */
+ public static materializeAsset(data: MaterializeAssetData):
CancelablePromise<MaterializeAssetResponse> {
+ return __request(OpenAPI, {
+ method: "POST",
+ url: "/public/assets/{asset_id}/materialize",
+ path: {
+ asset_id: data.assetId,
+ },
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 409: "Conflict",
+ 422: "Validation Error",
+ },
+ });
+ }
+
/**
* Get Asset Queued Events
* Get queued asset events for an asset.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 192d110860e..c33656c890c 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1621,6 +1621,12 @@ export type CreateAssetEventData = {
export type CreateAssetEventResponse = AssetEventResponse;
+export type MaterializeAssetData = {
+ assetId: number;
+};
+
+export type MaterializeAssetResponse = DAGRunResponse;
+
export type GetAssetQueuedEventsData = {
assetId: number;
before?: string | null;
@@ -2613,6 +2619,37 @@ export type $OpenApiTs = {
};
};
};
+ "/public/assets/{asset_id}/materialize": {
+ post: {
+ req: MaterializeAssetData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: DAGRunResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Conflict
+ */
+ 409: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/public/assets/{asset_id}/queuedEvents": {
get: {
req: GetAssetQueuedEventsData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index 540747cff24..9b7c3352a00 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -33,6 +33,7 @@ from airflow.models.asset import (
TaskOutletAssetReference,
)
from airflow.models.dagrun import DagRun
+from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState
@@ -47,7 +48,7 @@ DEFAULT_DATE = datetime(2020, 6, 11, 18, 0, 0,
tzinfo=timezone.utc)
pytestmark = pytest.mark.db_test
-def _create_assets(session, num: int = 2) -> None:
+def _create_assets(session, num: int = 2) -> list[AssetModel]:
assets = [
AssetModel(
id=i,
@@ -62,6 +63,7 @@ def _create_assets(session, num: int = 2) -> None:
]
session.add_all(assets)
session.commit()
+ return assets
def _create_assets_with_sensitive_extra(session, num: int = 2) -> None:
@@ -192,8 +194,8 @@ class TestAssets:
clear_db_runs()
@provide_session
- def create_assets(self, session, num: int = 2):
- _create_assets(session=session, num=num)
+ def create_assets(self, session, num: int = 2) -> list[AssetModel]:
+ return _create_assets(session=session, num=num)
@provide_session
def create_assets_with_sensitive_extra(self, session, num: int = 2):
@@ -921,7 +923,7 @@ class
TestDeleteDagDatasetQueuedEvents(TestQueuedEventEndpoint):
class TestPostAssetEvents(TestAssets):
@pytest.mark.usefixtures("time_freezer")
def test_should_respond_200(self, test_client, session):
- self.create_assets()
+ self.create_assets(session)
event_payload = {"asset_id": 1, "extra": {"foo": "bar"}}
response = test_client.post("/public/assets/events",
json=event_payload)
assert response.status_code == 200
@@ -970,6 +972,59 @@ class TestPostAssetEvents(TestAssets):
}
[email protected]_serialized_dag
+class TestPostAssetMaterialize(TestAssets):
+ DAG_ASSET1_ID = "test_dag_1"
+ DAG_ASSET2_ID_A = "test_dag_2a"
+ DAG_ASSET2_ID_B = "test_dag_2b"
+ DAG_ASSET_NO = "test_dag_no"
+
+ @pytest.fixture(autouse=True)
+ def create_dags(self, setup, dag_maker, session):
+ # Depend on 'setup' so it runs first. Otherwise it deletes what we
create here.
+ assets = {am.id: am.to_public() for am in
self.create_assets(session=session, num=3)}
+ with dag_maker(self.DAG_ASSET1_ID, schedule=None, session=session):
+ EmptyOperator(task_id="task", outlets=assets[1])
+ with dag_maker(self.DAG_ASSET2_ID_A, schedule=None, session=session):
+ EmptyOperator(task_id="task", outlets=assets[2])
+ with dag_maker(self.DAG_ASSET2_ID_B, schedule=None, session=session):
+ EmptyOperator(task_id="task", outlets=assets[2])
+ with dag_maker(self.DAG_ASSET_NO, schedule=None, session=session):
+ EmptyOperator(task_id="task")
+
+ def test_should_respond_200(self, test_client):
+ response = test_client.post("/public/assets/1/materialize")
+ assert response.status_code == 200
+ assert response.json() == {
+ "dag_run_id": mock.ANY,
+ "dag_id": self.DAG_ASSET1_ID,
+ "logical_date": None,
+ "queued_at": mock.ANY,
+ "run_after": mock.ANY,
+ "start_date": None,
+ "end_date": None,
+ "data_interval_start": None,
+ "data_interval_end": None,
+ "last_scheduling_decision": None,
+ "run_type": "manual",
+ "state": "queued",
+ "external_trigger": True,
+ "triggered_by": "rest_api",
+ "conf": {},
+ "note": None,
+ }
+
+ def test_should_respond_409_on_multiple_dags(self, test_client):
+ response = test_client.post("/public/assets/2/materialize")
+ assert response.status_code == 409
+ assert response.json()["detail"] == "More than one DAG materializes
asset with ID: 2"
+
+ def test_should_respond_404_on_multiple_dags(self, test_client):
+ response = test_client.post("/public/assets/3/materialize")
+ assert response.status_code == 404
+ assert response.json()["detail"] == "No DAG materializes asset with
ID: 3"
+
+
class TestGetAssetQueuedEvents(TestQueuedEventEndpoint):
@pytest.mark.usefixtures("time_freezer")
def test_should_respond_200(self, test_client, session, create_dummy_dag):