This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e0377ebcac1 AIP-84 Add external dependencies to GET Structure Data
Endpoint (#44701)
e0377ebcac1 is described below
commit e0377ebcac1db379b25a9973fa284d4d32201f9e
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Fri Dec 13 07:28:47 2024 +0800
AIP-84 Add external dependencies to GET Structure Data Endpoint (#44701)
* Working state
* Add tests for external task sensor
* Add test for TriggerDagRunOperator
* Fix following code review
* Update following code review
---
.../core_api/datamodels/ui/structure.py | 2 +-
.../api_fastapi/core_api/openapi/v1-generated.yaml | 14 +-
.../api_fastapi/core_api/routes/ui/structure.py | 42 +++-
airflow/ui/openapi-gen/queries/common.ts | 6 +-
airflow/ui/openapi-gen/queries/prefetch.ts | 5 +
airflow/ui/openapi-gen/queries/queries.ts | 6 +-
airflow/ui/openapi-gen/queries/suspense.ts | 6 +-
airflow/ui/openapi-gen/requests/schemas.gen.ts | 11 +-
airflow/ui/openapi-gen/requests/services.gen.ts | 2 +
airflow/ui/openapi-gen/requests/types.gen.ts | 21 +-
.../ui/src/layouts/Details/Graph/useGraphLayout.ts | 2 +-
airflow/www/views.py | 1 +
.../core_api/routes/ui/test_structure.py | 238 ++++++++++++++++++++-
13 files changed, 338 insertions(+), 18 deletions(-)
diff --git a/airflow/api_fastapi/core_api/datamodels/ui/structure.py
b/airflow/api_fastapi/core_api/datamodels/ui/structure.py
index cb135c141bb..40c7b9dc077 100644
--- a/airflow/api_fastapi/core_api/datamodels/ui/structure.py
+++ b/airflow/api_fastapi/core_api/datamodels/ui/structure.py
@@ -39,7 +39,7 @@ class NodeResponse(BaseModel):
label: str
tooltip: str | None = None
setup_teardown_type: Literal["setup", "teardown"] | None = None
- type: Literal["join", "task", "asset_condition"]
+ type: Literal["join", "task", "asset-condition", "asset", "asset-alias",
"dag", "sensor", "trigger"]
operator: str | None = None
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index d5b4f66b06d..4d8a1c5beb9 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -240,6 +240,13 @@ paths:
type: boolean
default: false
title: Include Downstream
+ - name: external_dependencies
+ in: query
+ required: false
+ schema:
+ type: boolean
+ default: false
+ title: External Dependencies
responses:
'200':
description: Successful Response
@@ -8042,7 +8049,12 @@ components:
enum:
- join
- task
- - asset_condition
+ - asset-condition
+ - asset
+ - asset-alias
+ - dag
+ - sensor
+ - trigger
title: Type
operator:
anyOf:
diff --git a/airflow/api_fastapi/core_api/routes/ui/structure.py
b/airflow/api_fastapi/core_api/routes/ui/structure.py
index 3ac429cfa95..c3b914508d4 100644
--- a/airflow/api_fastapi/core_api/routes/ui/structure.py
+++ b/airflow/api_fastapi/core_api/routes/ui/structure.py
@@ -22,6 +22,7 @@ from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.ui.structure import
StructureDataResponse
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.dag_edges import dag_edges
from airflow.utils.task_group import task_group_to_dict
@@ -39,6 +40,7 @@ def structure_data(
root: str | None = None,
include_upstream: bool = False,
include_downstream: bool = False,
+ external_dependencies: bool = False,
) -> StructureDataResponse:
"""Get Structure Data."""
dag = request.app.state.dag_bag.get_dag(dag_id)
@@ -51,9 +53,7 @@ def structure_data(
task_ids_or_regex=root, include_upstream=include_upstream,
include_downstream=include_downstream
)
- nodes = [
- task_group_to_dict(child) for child in
sorted(dag.task_group.children.values(), key=lambda t: t.label)
- ]
+ nodes = [task_group_to_dict(child) for child in
dag.task_group.topological_sort()]
edges = dag_edges(dag)
data = {
@@ -62,4 +62,40 @@ def structure_data(
"edges": edges,
}
+ if external_dependencies:
+ entry_node_ref = nodes[0] if nodes else None
+ exit_node_ref = nodes[-1] if nodes else None
+
+ start_edges: list[dict] = []
+ end_edges: list[dict] = []
+
+ for dependency_dag_id, dependencies in
SerializedDagModel.get_dag_dependencies().items():
+ for dependency in dependencies:
+ if dependency_dag_id != dag_id and dependency.target != dag_id:
+ continue
+
+ # Add nodes
+ nodes.append(
+ {
+ "id": dependency.node_id,
+ "label": dependency.dependency_id,
+ "type": dependency.dependency_type,
+ }
+ )
+
+ # Add edges
+ # start dependency
+ if (
+ dependency.source == dependency.dependency_type or
dependency.target == dag_id
+ ) and entry_node_ref:
+ start_edges.append({"source_id": dependency.node_id,
"target_id": entry_node_ref["id"]})
+
+ # end dependency
+ elif (
+ dependency.target == dependency.dependency_type or
dependency.source == dag_id
+ ) and exit_node_ref:
+ end_edges.append({"source_id": exit_node_ref["id"],
"target_id": dependency.node_id})
+
+ data["edges"] = start_edges + edges + end_edges
+
return StructureDataResponse(**data)
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index be897ab52a5..d747a10893b 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -387,11 +387,13 @@ export const useStructureServiceStructureDataKey =
export const UseStructureServiceStructureDataKeyFn = (
{
dagId,
+ externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
+ externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
root?: string;
@@ -399,7 +401,9 @@ export const UseStructureServiceStructureDataKeyFn = (
queryKey?: Array<unknown>,
) => [
useStructureServiceStructureDataKey,
- ...(queryKey ?? [{ dagId, includeDownstream, includeUpstream, root }]),
+ ...(queryKey ?? [
+ { dagId, externalDependencies, includeDownstream, includeUpstream, root },
+ ]),
];
export type BackfillServiceListBackfillsDefaultResponse = Awaited<
ReturnType<typeof BackfillService.listBackfills>
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 8b80ae7c08d..8f455a2409c 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -489,6 +489,7 @@ export const prefetchUseDashboardServiceHistoricalMetrics =
(
* @param data.root
* @param data.includeUpstream
* @param data.includeDownstream
+ * @param data.externalDependencies
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
@@ -496,11 +497,13 @@ export const prefetchUseStructureServiceStructureData = (
queryClient: QueryClient,
{
dagId,
+ externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
+ externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
root?: string;
@@ -509,6 +512,7 @@ export const prefetchUseStructureServiceStructureData = (
queryClient.prefetchQuery({
queryKey: Common.UseStructureServiceStructureDataKeyFn({
dagId,
+ externalDependencies,
includeDownstream,
includeUpstream,
root,
@@ -516,6 +520,7 @@ export const prefetchUseStructureServiceStructureData = (
queryFn: () =>
StructureService.structureData({
dagId,
+ externalDependencies,
includeDownstream,
includeUpstream,
root,
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 83404e4bb34..60bcb9de9c3 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -613,6 +613,7 @@ export const useDashboardServiceHistoricalMetrics = <
* @param data.root
* @param data.includeUpstream
* @param data.includeDownstream
+ * @param data.externalDependencies
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
@@ -623,11 +624,13 @@ export const useStructureServiceStructureData = <
>(
{
dagId,
+ externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
+ externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
root?: string;
@@ -637,12 +640,13 @@ export const useStructureServiceStructureData = <
) =>
useQuery<TData, TError>({
queryKey: Common.UseStructureServiceStructureDataKeyFn(
- { dagId, includeDownstream, includeUpstream, root },
+ { dagId, externalDependencies, includeDownstream, includeUpstream, root
},
queryKey,
),
queryFn: () =>
StructureService.structureData({
dagId,
+ externalDependencies,
includeDownstream,
includeUpstream,
root,
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 4508a23b5e7..f29fb594e29 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -588,6 +588,7 @@ export const useDashboardServiceHistoricalMetricsSuspense =
<
* @param data.root
* @param data.includeUpstream
* @param data.includeDownstream
+ * @param data.externalDependencies
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
@@ -598,11 +599,13 @@ export const useStructureServiceStructureDataSuspense = <
>(
{
dagId,
+ externalDependencies,
includeDownstream,
includeUpstream,
root,
}: {
dagId: string;
+ externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
root?: string;
@@ -612,12 +615,13 @@ export const useStructureServiceStructureDataSuspense = <
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseStructureServiceStructureDataKeyFn(
- { dagId, includeDownstream, includeUpstream, root },
+ { dagId, externalDependencies, includeDownstream, includeUpstream, root
},
queryKey,
),
queryFn: () =>
StructureService.structureData({
dagId,
+ externalDependencies,
includeDownstream,
includeUpstream,
root,
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 0b9333c500c..41e309d694c 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -3185,7 +3185,16 @@ export const $NodeResponse = {
},
type: {
type: "string",
- enum: ["join", "task", "asset_condition"],
+ enum: [
+ "join",
+ "task",
+ "asset-condition",
+ "asset",
+ "asset-alias",
+ "dag",
+ "sensor",
+ "trigger",
+ ],
title: "Type",
},
operator: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 98ff5677360..be4dd239cbe 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -742,6 +742,7 @@ export class StructureService {
* @param data.root
* @param data.includeUpstream
* @param data.includeDownstream
+ * @param data.externalDependencies
* @returns StructureDataResponse Successful Response
* @throws ApiError
*/
@@ -756,6 +757,7 @@ export class StructureService {
root: data.root,
include_upstream: data.includeUpstream,
include_downstream: data.includeDownstream,
+ external_dependencies: data.externalDependencies,
},
errors: {
404: "Not Found",
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index f7640ef45a0..1e9112db378 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -773,11 +773,27 @@ export type NodeResponse = {
label: string;
tooltip?: string | null;
setup_teardown_type?: "setup" | "teardown" | null;
- type: "join" | "task" | "asset_condition";
+ type:
+ | "join"
+ | "task"
+ | "asset-condition"
+ | "asset"
+ | "asset-alias"
+ | "dag"
+ | "sensor"
+ | "trigger";
operator?: string | null;
};
-export type type = "join" | "task" | "asset_condition";
+export type type =
+ | "join"
+ | "task"
+ | "asset-condition"
+ | "asset"
+ | "asset-alias"
+ | "dag"
+ | "sensor"
+ | "trigger";
/**
* Request body for Clear Task Instances endpoint.
@@ -1443,6 +1459,7 @@ export type HistoricalMetricsResponse =
HistoricalMetricDataResponse;
export type StructureDataData = {
dagId: string;
+ externalDependencies?: boolean;
includeDownstream?: boolean;
includeUpstream?: boolean;
root?: string | null;
diff --git a/airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts
b/airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts
index 2cd9bae37b4..819d03f77d4 100644
--- a/airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts
+++ b/airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts
@@ -211,7 +211,7 @@ const generateElkGraph = ({
if (node.type === "join") {
width = 10;
height = 10;
- } else if (node.type === "asset_condition") {
+ } else if (node.type === "asset-condition") {
width = 30;
height = 30;
}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index b12092ffe3f..fa60f732e72 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3228,6 +3228,7 @@ class Airflow(AirflowBaseView):
else:
return {"url": None, "error": f"No URL found for {link_name}"}, 404
+ @mark_fastapi_migration_done
@expose("/object/graph_data")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@gzipped
diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py
b/tests/api_fastapi/core_api/routes/ui/test_structure.py
index e270a49c6e8..0596b0f43e0 100644
--- a/tests/api_fastapi/core_api/routes/ui/test_structure.py
+++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py
@@ -22,12 +22,16 @@ import pytest
from airflow.models import DagBag
from airflow.operators.empty import EmptyOperator
+from airflow.providers.standard.operators.trigger_dagrun import
TriggerDagRunOperator
+from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
+from airflow.sdk.definitions.asset import Asset, AssetAlias, Dataset
from tests_common.test_utils.db import clear_db_runs
pytestmark = pytest.mark.db_test
DAG_ID = "test_dag_id"
+DAG_ID_EXTERNAL_TRIGGER = "external_trigger"
@pytest.fixture(autouse=True, scope="module")
@@ -46,13 +50,32 @@ def clean():
@pytest.fixture
def make_dag(dag_maker, session, time_machine):
+ with dag_maker(
+ dag_id=DAG_ID_EXTERNAL_TRIGGER,
+ serialized=True,
+ session=session,
+ start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC),
+ ):
+ TriggerDagRunOperator(task_id="trigger_dag_run_operator",
trigger_dag_id=DAG_ID)
+
+ dag_maker.dagbag.sync_to_db()
+
with dag_maker(
dag_id=DAG_ID,
serialized=True,
session=session,
start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC),
+ schedule=(
+ Asset(uri="s3://bucket/next-run-asset/1", name="asset1")
+ & Asset(uri="s3://bucket/next-run-asset/2", name="asset2")
+ & AssetAlias("example-alias")
+ ),
):
- EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2")
+ (
+ EmptyOperator(task_id="task_1",
outlets=[Dataset(uri="s3://dataset-bucket/example.csv")])
+ >> ExternalTaskSensor(task_id="external_task_sensor",
external_dag_id=DAG_ID)
+ >> EmptyOperator(task_id="task_2")
+ )
dag_maker.dagbag.sync_to_db()
@@ -64,14 +87,19 @@ class TestStructureDataEndpoint:
(
{"dag_id": DAG_ID},
{
- "arrange": "LR",
"edges": [
{
"is_setup_teardown": None,
"label": None,
- "source_id": "task_1",
+ "source_id": "external_task_sensor",
"target_id": "task_2",
},
+ {
+ "is_setup_teardown": None,
+ "label": None,
+ "source_id": "task_1",
+ "target_id": "external_task_sensor",
+ },
],
"nodes": [
{
@@ -79,22 +107,33 @@ class TestStructureDataEndpoint:
"id": "task_1",
"is_mapped": None,
"label": "task_1",
- "operator": "EmptyOperator",
+ "tooltip": None,
"setup_teardown_type": None,
+ "type": "task",
+ "operator": "EmptyOperator",
+ },
+ {
+ "children": None,
+ "id": "external_task_sensor",
+ "is_mapped": None,
+ "label": "external_task_sensor",
"tooltip": None,
+ "setup_teardown_type": None,
"type": "task",
+ "operator": "ExternalTaskSensor",
},
{
"children": None,
"id": "task_2",
"is_mapped": None,
"label": "task_2",
- "operator": "EmptyOperator",
- "setup_teardown_type": None,
"tooltip": None,
+ "setup_teardown_type": None,
"type": "task",
+ "operator": "EmptyOperator",
},
],
+ "arrange": "LR",
},
),
(
@@ -128,6 +167,193 @@ class TestStructureDataEndpoint:
],
},
),
+ (
+ {
+ "dag_id": DAG_ID,
+ "external_dependencies": True,
+ },
+ {
+ "edges": [
+ {
+ "is_setup_teardown": None,
+ "label": None,
+ "source_id":
"trigger:external_trigger:test_dag_id:trigger_dag_run_operator",
+ "target_id": "task_1",
+ },
+ {
+ "is_setup_teardown": None,
+ "label": None,
+ "source_id": "asset:asset1",
+ "target_id": "task_1",
+ },
+ {
+ "is_setup_teardown": None,
+ "label": None,
+ "source_id": "asset:asset2",
+ "target_id": "task_1",
+ },
+ {
+ "is_setup_teardown": None,
+ "label": None,
+ "source_id": "asset-alias:example-alias",
+ "target_id": "task_1",
+ },
+ {
+ "is_setup_teardown": None,
+ "label": None,
+ "source_id":
"sensor:test_dag_id:test_dag_id:external_task_sensor",
+ "target_id": "task_1",
+ },
+ {
+ "is_setup_teardown": None,
+ "label": None,
+ "source_id": "external_task_sensor",
+ "target_id": "task_2",
+ },
+ {
+ "is_setup_teardown": None,
+ "label": None,
+ "source_id": "task_1",
+ "target_id": "external_task_sensor",
+ },
+ {
+ "is_setup_teardown": None,
+ "label": None,
+ "source_id": "task_2",
+ "target_id":
"asset:s3://dataset-bucket/example.csv",
+ },
+ ],
+ "nodes": [
+ {
+ "children": None,
+ "id": "task_1",
+ "is_mapped": None,
+ "label": "task_1",
+ "tooltip": None,
+ "setup_teardown_type": None,
+ "type": "task",
+ "operator": "EmptyOperator",
+ },
+ {
+ "children": None,
+ "id": "external_task_sensor",
+ "is_mapped": None,
+ "label": "external_task_sensor",
+ "tooltip": None,
+ "setup_teardown_type": None,
+ "type": "task",
+ "operator": "ExternalTaskSensor",
+ },
+ {
+ "children": None,
+ "id": "task_2",
+ "is_mapped": None,
+ "label": "task_2",
+ "tooltip": None,
+ "setup_teardown_type": None,
+ "type": "task",
+ "operator": "EmptyOperator",
+ },
+ {
+ "children": None,
+ "id":
"trigger:external_trigger:test_dag_id:trigger_dag_run_operator",
+ "is_mapped": None,
+ "label": "trigger_dag_run_operator",
+ "tooltip": None,
+ "setup_teardown_type": None,
+ "type": "trigger",
+ "operator": None,
+ },
+ {
+ "children": None,
+ "id": "asset:asset1",
+ "is_mapped": None,
+ "label": "asset1",
+ "tooltip": None,
+ "setup_teardown_type": None,
+ "type": "asset",
+ "operator": None,
+ },
+ {
+ "children": None,
+ "id": "asset:asset2",
+ "is_mapped": None,
+ "label": "asset2",
+ "tooltip": None,
+ "setup_teardown_type": None,
+ "type": "asset",
+ "operator": None,
+ },
+ {
+ "children": None,
+ "id": "asset-alias:example-alias",
+ "is_mapped": None,
+ "label": "example-alias",
+ "tooltip": None,
+ "setup_teardown_type": None,
+ "type": "asset-alias",
+ "operator": None,
+ },
+ {
+ "children": None,
+ "id": "asset:s3://dataset-bucket/example.csv",
+ "is_mapped": None,
+ "label": "s3://dataset-bucket/example.csv",
+ "tooltip": None,
+ "setup_teardown_type": None,
+ "type": "asset",
+ "operator": None,
+ },
+ {
+ "children": None,
+ "id":
"sensor:test_dag_id:test_dag_id:external_task_sensor",
+ "is_mapped": None,
+ "label": "external_task_sensor",
+ "tooltip": None,
+ "setup_teardown_type": None,
+ "type": "sensor",
+ "operator": None,
+ },
+ ],
+ "arrange": "LR",
+ },
+ ),
+ (
+ {"dag_id": DAG_ID_EXTERNAL_TRIGGER, "external_dependencies":
True},
+ {
+ "edges": [
+ {
+ "is_setup_teardown": None,
+ "label": None,
+ "source_id": "trigger_dag_run_operator",
+ "target_id":
"trigger:external_trigger:test_dag_id:trigger_dag_run_operator",
+ }
+ ],
+ "nodes": [
+ {
+ "children": None,
+ "id": "trigger_dag_run_operator",
+ "is_mapped": None,
+ "label": "trigger_dag_run_operator",
+ "tooltip": None,
+ "setup_teardown_type": None,
+ "type": "task",
+ "operator": "TriggerDagRunOperator",
+ },
+ {
+ "children": None,
+ "id":
"trigger:external_trigger:test_dag_id:trigger_dag_run_operator",
+ "is_mapped": None,
+ "label": "trigger_dag_run_operator",
+ "tooltip": None,
+ "setup_teardown_type": None,
+ "type": "trigger",
+ "operator": None,
+ },
+ ],
+ "arrange": "LR",
+ },
+ ),
],
)
@pytest.mark.usefixtures("make_dag")