This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0f14f66994a AIP-84: Migrating GET Dataset events for DAG runs api to
fastAPI (#43874)
0f14f66994a is described below
commit 0f14f66994ae6960796029bf34ed871afada4dab
Author: Amogh Desai <[email protected]>
AuthorDate: Fri Nov 15 21:47:58 2024 +0530
AIP-84: Migrating GET Dataset events for DAG runs api to fastAPI (#43874)
* AIP-84: Migrating GET Assets to fastAPI
* matching response to legacy
* Adding unit tests - part 1
* Update airflow/api_fastapi/common/parameters.py
Co-authored-by: Jed Cunningham
<[email protected]>
* fixing the dag_ids filter
* fixing the dag_ids filter
* Adding unit tests - part 2
* fixing unit tests & updating parameter type
* review comments pierre
* fixing last commit
* fixing unit tests
* AIP-84: Migrating GET Dataset events for DAG runs to fastAPI
* adding test cases
* adding test cases
* review comments pierre
* fixing unit tests
* review comments pierre
* review comments and fixing a test
* review comments on ut
---------
Co-authored-by: Jed Cunningham
<[email protected]>
---
.../api_connexion/endpoints/dag_run_endpoint.py | 1 +
airflow/api_fastapi/core_api/datamodels/assets.py | 2 +-
.../api_fastapi/core_api/openapi/v1-generated.yaml | 58 ++++++++++++++++-
.../api_fastapi/core_api/routes/public/dag_run.py | 33 ++++++++++
airflow/ui/openapi-gen/queries/common.ts | 22 +++++++
airflow/ui/openapi-gen/queries/prefetch.ts | 26 ++++++++
airflow/ui/openapi-gen/queries/queries.ts | 33 ++++++++++
airflow/ui/openapi-gen/queries/suspense.ts | 33 ++++++++++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 11 +++-
airflow/ui/openapi-gen/requests/services.gen.ts | 30 +++++++++
airflow/ui/openapi-gen/requests/types.gen.ts | 36 ++++++++++-
.../core_api/routes/public/test_dag_run.py | 75 ++++++++++++++++++++++
12 files changed, 354 insertions(+), 6 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 7cb6d76ff2a..b8e7f36d1fd 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -115,6 +115,7 @@ def get_dag_run(
raise BadRequest("DAGRunSchema error", detail=str(e))
+@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.RUN)
@security.requires_access_asset("GET")
@provide_session
diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py
b/airflow/api_fastapi/core_api/datamodels/assets.py
index 1295b33fbf7..3e317a4c7e3 100644
--- a/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -73,7 +73,7 @@ class DagRunAssetReference(BaseModel):
dag_id: str
execution_date: datetime = Field(alias="logical_date")
start_date: datetime
- end_date: datetime
+ end_date: datetime | None
state: str
data_interval_start: datetime
data_interval_end: datetime
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 23d4eeb6e40..94bb60b8efb 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -1164,6 +1164,58 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents:
+ get:
+ tags:
+ - DagRun
+ summary: Get Upstream Asset Events
+ description: If dag run is asset-triggered, return the asset events that
triggered
+ it.
+ operationId: get_upstream_asset_events
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/AssetEventCollectionResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear:
post:
tags:
@@ -4919,8 +4971,10 @@ components:
format: date-time
title: Start Date
end_date:
- type: string
- format: date-time
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
title: End Date
state:
type: string
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index d95cf76f69a..decc7ff2b28 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -30,6 +30,7 @@ from airflow.api.common.mark_tasks import (
)
from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.assets import
AssetEventCollectionResponse, AssetEventResponse
from airflow.api_fastapi.core_api.datamodels.dag_run import (
DAGRunClearBody,
DAGRunPatchBody,
@@ -149,6 +150,38 @@ def patch_dag_run(
return DAGRunResponse.model_validate(dag_run, from_attributes=True)
+@dag_run_router.get(
+ "/{dag_run_id}/upstreamAssetEvents",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def get_upstream_asset_events(
+ dag_id: str, dag_run_id: str, session: Annotated[Session,
Depends(get_session)]
+) -> AssetEventCollectionResponse:
+ """If dag run is asset-triggered, return the asset events that triggered
it."""
+ dag_run: DagRun | None = session.scalar(
+ select(DagRun).where(
+ DagRun.dag_id == dag_id,
+ DagRun.run_id == dag_run_id,
+ )
+ )
+ if dag_run is None:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}`
was not found",
+ )
+ events = dag_run.consumed_asset_events
+ return AssetEventCollectionResponse(
+ asset_events=[
+ AssetEventResponse.model_validate(asset_event,
from_attributes=True) for asset_event in events
+ ],
+ total_entries=len(events),
+ )
+
+
@dag_run_router.post(
"/{dag_run_id}/clear",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND])
)
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index 60464a7580e..6175179d11c 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -305,6 +305,28 @@ export const UseDagRunServiceGetDagRunKeyFn = (
},
queryKey?: Array<unknown>,
) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])];
+export type DagRunServiceGetUpstreamAssetEventsDefaultResponse = Awaited<
+ ReturnType<typeof DagRunService.getUpstreamAssetEvents>
+>;
+export type DagRunServiceGetUpstreamAssetEventsQueryResult<
+ TData = DagRunServiceGetUpstreamAssetEventsDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useDagRunServiceGetUpstreamAssetEventsKey =
+ "DagRunServiceGetUpstreamAssetEvents";
+export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = (
+ {
+ dagId,
+ dagRunId,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ },
+ queryKey?: Array<unknown>,
+) => [
+ useDagRunServiceGetUpstreamAssetEventsKey,
+ ...(queryKey ?? [{ dagId, dagRunId }]),
+];
export type DagSourceServiceGetDagSourceDefaultResponse = Awaited<
ReturnType<typeof DagSourceService.getDagSource>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index ac1cd93db37..4c541670258 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -386,6 +386,32 @@ export const prefetchUseDagRunServiceGetDagRun = (
queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }),
});
+/**
+ * Get Upstream Asset Events
+ * If dag run is asset-triggered, return the asset events that triggered it.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ dagRunId,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({
+ dagId,
+ dagRunId,
+ }),
+ queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }),
+ });
/**
* Get Dag Source
* Get source code using file token.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 68a31b0a7a1..3c6a426ee07 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -485,6 +485,39 @@ export const useDagRunServiceGetDagRun = <
queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
...options,
});
+/**
+ * Get Upstream Asset Events
+ * If dag run is asset-triggered, return the asset events that triggered it.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagRunServiceGetUpstreamAssetEvents = <
+ TData = Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ dagRunId,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn(
+ { dagId, dagRunId },
+ queryKey,
+ ),
+ queryFn: () =>
+ DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) as TData,
+ ...options,
+ });
/**
* Get Dag Source
* Get source code using file token.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 0c162cc42cd..43331b187fe 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -469,6 +469,39 @@ export const useDagRunServiceGetDagRunSuspense = <
queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
...options,
});
+/**
+ * Get Upstream Asset Events
+ * If dag run is asset-triggered, return the asset events that triggered it.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagRunServiceGetUpstreamAssetEventsSuspense = <
+ TData = Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ dagRunId,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn(
+ { dagId, dagRunId },
+ queryKey,
+ ),
+ queryFn: () =>
+ DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) as TData,
+ ...options,
+ });
/**
* Get Dag Source
* Get source code using file token.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 61007167cdd..17b4cf78721 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1936,8 +1936,15 @@ export const $DagRunAssetReference = {
title: "Start Date",
},
end_date: {
- type: "string",
- format: "date-time",
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
title: "End Date",
},
state: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 451eac3365d..1ebbac6aa6c 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -45,6 +45,8 @@ import type {
DeleteDagRunResponse,
PatchDagRunData,
PatchDagRunResponse,
+ GetUpstreamAssetEventsData,
+ GetUpstreamAssetEventsResponse,
ClearDagRunData,
ClearDagRunResponse,
GetDagSourceData,
@@ -740,6 +742,34 @@ export class DagRunService {
});
}
+ /**
+ * Get Upstream Asset Events
+ * If dag run is asset-triggered, return the asset events that triggered it.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+ public static getUpstreamAssetEvents(
+ data: GetUpstreamAssetEventsData,
+ ): CancelablePromise<GetUpstreamAssetEventsResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents",
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId,
+ },
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+
/**
* Clear Dag Run
* @param data The data for the request.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 0b221ab4ae7..05b6e84adfc 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -426,7 +426,7 @@ export type DagRunAssetReference = {
dag_id: string;
logical_date: string;
start_date: string;
- end_date: string;
+ end_date: string | null;
state: string;
data_interval_start: string;
data_interval_end: string;
@@ -1129,6 +1129,13 @@ export type PatchDagRunData = {
export type PatchDagRunResponse = DAGRunResponse;
+export type GetUpstreamAssetEventsData = {
+ dagId: string;
+ dagRunId: string;
+};
+
+export type GetUpstreamAssetEventsResponse = AssetEventCollectionResponse;
+
export type ClearDagRunData = {
dagId: string;
dagRunId: string;
@@ -2010,6 +2017,33 @@ export type $OpenApiTs = {
};
};
};
+ "/public/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents": {
+ get: {
+ req: GetUpstreamAssetEventsData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: AssetEventCollectionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear": {
post: {
req: ClearDagRunData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
index f6ee3d2dee9..7d28a9237e3 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -22,7 +22,9 @@ from datetime import datetime, timezone
import pytest
from sqlalchemy import select
+from airflow import Asset
from airflow.models import DagRun
+from airflow.models.asset import AssetEvent, AssetModel
from airflow.operators.empty import EmptyOperator
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState, State
@@ -51,6 +53,7 @@ DAG1_RUN2_TRIGGERED_BY = DagRunTriggeredByType.ASSET
DAG2_RUN1_TRIGGERED_BY = DagRunTriggeredByType.CLI
DAG2_RUN2_TRIGGERED_BY = DagRunTriggeredByType.REST_API
START_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc)
+END_DATE = datetime(2024, 6, 15, 0, 0, tzinfo=timezone.utc)
EXECUTION_DATE = datetime(2024, 6, 16, 0, 0, tzinfo=timezone.utc)
DAG1_RUN1_NOTE = "test_note"
@@ -264,6 +267,78 @@ class TestDeleteDagRun:
assert body["detail"] == "The DagRun with dag_id: `test_dag1` and
run_id: `invalid` was not found"
+class TestGetDagRunAssetTriggerEvents:
+ def test_should_respond_200(self, test_client, dag_maker, session):
+ asset1 = Asset(uri="ds1")
+
+ with dag_maker(dag_id="source_dag", start_date=START_DATE,
session=session):
+ EmptyOperator(task_id="task", outlets=[asset1])
+ dr = dag_maker.create_dagrun()
+ ti = dr.task_instances[0]
+
+ asset1_id =
session.query(AssetModel.id).filter_by(uri=asset1.uri).scalar()
+ event = AssetEvent(
+ asset_id=asset1_id,
+ source_task_id=ti.task_id,
+ source_dag_id=ti.dag_id,
+ source_run_id=ti.run_id,
+ source_map_index=ti.map_index,
+ )
+ session.add(event)
+
+ with dag_maker(dag_id="TEST_DAG_ID", start_date=START_DATE,
session=session):
+ pass
+ dr = dag_maker.create_dagrun(run_id="TEST_DAG_RUN_ID",
run_type=DagRunType.ASSET_TRIGGERED)
+ dr.consumed_asset_events.append(event)
+
+ session.commit()
+ assert event.timestamp
+
+ response = test_client.get(
+
"/public/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamAssetEvents",
+ )
+ assert response.status_code == 200
+ expected_response = {
+ "asset_events": [
+ {
+ "timestamp": event.timestamp.isoformat().replace("+00:00",
"Z"),
+ "asset_id": asset1_id,
+ "uri": asset1.uri,
+ "extra": {},
+ "id": event.id,
+ "source_dag_id": ti.dag_id,
+ "source_map_index": ti.map_index,
+ "source_run_id": ti.run_id,
+ "source_task_id": ti.task_id,
+ "created_dagruns": [
+ {
+ "dag_id": "TEST_DAG_ID",
+ "run_id": "TEST_DAG_RUN_ID",
+ "data_interval_end":
dr.data_interval_end.isoformat().replace("+00:00", "Z"),
+ "data_interval_start":
dr.data_interval_start.isoformat().replace("+00:00", "Z"),
+ "end_date": None,
+ "logical_date":
dr.logical_date.isoformat().replace("+00:00", "Z"),
+ "start_date":
dr.start_date.isoformat().replace("+00:00", "Z"),
+ "state": "running",
+ }
+ ],
+ }
+ ],
+ "total_entries": 1,
+ }
+ assert response.json() == expected_response
+
+ def test_should_respond_404(self, test_client):
+ response = test_client.get(
+
"public/dags/invalid-id/dagRuns/invalid-run-id/upstreamAssetEvents",
+ )
+ assert response.status_code == 404
+ assert (
+ "The DagRun with dag_id: `invalid-id` and run_id: `invalid-run-id`
was not found"
+ == response.json()["detail"]
+ )
+
+
class TestClearDagRun:
def test_clear_dag_run(self, test_client):
response = test_client.post(