This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 13a18c34207 AIP-84 add external dependencies asset condition (#44877)
13a18c34207 is described below
commit 13a18c34207a977fa536520621c814146cb126a0
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Tue Dec 17 01:19:34 2024 +0800
AIP-84 add external dependencies asset condition (#44877)
* Structure endpoint add asset-condition
* Fix CI
---
.../core_api/datamodels/ui/structure.py | 2 +
.../api_fastapi/core_api/openapi/v1-generated.yaml | 13 +++
.../api_fastapi/core_api/routes/ui/structure.py | 32 +++++---
airflow/api_fastapi/core_api/services/__init__.py | 16 ++++
airflow/api_fastapi/core_api/services/ui/__init.py | 16 ++++
.../api_fastapi/core_api/services/ui/structure.py | 94 ++++++++++++++++++++++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 23 ++++++
airflow/ui/openapi-gen/requests/types.gen.ts | 2 +
.../core_api/routes/ui/test_structure.py | 82 ++++++++++++++-----
9 files changed, 252 insertions(+), 28 deletions(-)
diff --git a/airflow/api_fastapi/core_api/datamodels/ui/structure.py
b/airflow/api_fastapi/core_api/datamodels/ui/structure.py
index 40c7b9dc077..35874caa70c 100644
--- a/airflow/api_fastapi/core_api/datamodels/ui/structure.py
+++ b/airflow/api_fastapi/core_api/datamodels/ui/structure.py
@@ -28,6 +28,7 @@ class EdgeResponse(BaseModel):
label: str | None = None
source_id: str
target_id: str
+ is_source_asset: bool | None = None
class NodeResponse(BaseModel):
@@ -41,6 +42,7 @@ class NodeResponse(BaseModel):
setup_teardown_type: Literal["setup", "teardown"] | None = None
type: Literal["join", "task", "asset-condition", "asset", "asset-alias",
"dag", "sensor", "trigger"]
operator: str | None = None
+ asset_condition_type: Literal["or-gate", "and-gate"] | None = None
class StructureDataResponse(BaseModel):
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index a36e45d4241..cf1260f5a84 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -7734,6 +7734,11 @@ components:
target_id:
type: string
title: Target Id
+ is_source_asset:
+ anyOf:
+ - type: boolean
+ - type: 'null'
+ title: Is Source Asset
type: object
required:
- source_id
@@ -8079,6 +8084,14 @@ components:
- type: string
- type: 'null'
title: Operator
+ asset_condition_type:
+ anyOf:
+ - type: string
+ enum:
+ - or-gate
+ - and-gate
+ - type: 'null'
+ title: Asset Condition Type
type: object
required:
- id
diff --git a/airflow/api_fastapi/core_api/routes/ui/structure.py
b/airflow/api_fastapi/core_api/routes/ui/structure.py
index c3b914508d4..417b711cf19 100644
--- a/airflow/api_fastapi/core_api/routes/ui/structure.py
+++ b/airflow/api_fastapi/core_api/routes/ui/structure.py
@@ -22,6 +22,7 @@ from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.ui.structure import
StructureDataResponse
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.structure import
get_upstream_assets
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.dag_edges import dag_edges
from airflow.utils.task_group import task_group_to_dict
@@ -71,17 +72,16 @@ def structure_data(
for dependency_dag_id, dependencies in
SerializedDagModel.get_dag_dependencies().items():
for dependency in dependencies:
+ # Dependencies not related to `dag_id` are ignored
if dependency_dag_id != dag_id and dependency.target != dag_id:
continue
- # Add nodes
- nodes.append(
- {
- "id": dependency.node_id,
- "label": dependency.dependency_id,
- "type": dependency.dependency_type,
- }
- )
+ # upstream assets are handled by the `get_upstream_assets`
function.
+ if dependency.target != dependency.dependency_type and
dependency.dependency_type in [
+ "asset-alias",
+ "asset",
+ ]:
+ continue
# Add edges
# start dependency
@@ -96,6 +96,20 @@ def structure_data(
) and exit_node_ref:
end_edges.append({"source_id": exit_node_ref["id"],
"target_id": dependency.node_id})
- data["edges"] = start_edges + edges + end_edges
+ # Add nodes
+ nodes.append(
+ {
+ "id": dependency.node_id,
+ "label": dependency.dependency_id,
+ "type": dependency.dependency_type,
+ }
+ )
+
+ upstream_asset_nodes, upstream_asset_edges = get_upstream_assets(
+ dag.timetable.asset_condition, entry_node_ref["id"]
+ )
+
+ data["nodes"] += upstream_asset_nodes
+ data["edges"] = upstream_asset_edges + start_edges + edges + end_edges
return StructureDataResponse(**data)
diff --git a/airflow/api_fastapi/core_api/services/__init__.py
b/airflow/api_fastapi/core_api/services/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/airflow/api_fastapi/core_api/services/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/api_fastapi/core_api/services/ui/__init.py
b/airflow/api_fastapi/core_api/services/ui/__init.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/airflow/api_fastapi/core_api/services/ui/__init.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/api_fastapi/core_api/services/ui/structure.py
b/airflow/api_fastapi/core_api/services/ui/structure.py
new file mode 100644
index 00000000000..72ee000b1f5
--- /dev/null
+++ b/airflow/api_fastapi/core_api/services/ui/structure.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Private service for dag structure.
+
+:meta private:
+"""
+
+from __future__ import annotations
+
+from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAll,
AssetAny, BaseAsset
+
+
+def get_upstream_assets(
+ asset_condition: BaseAsset, entry_node_ref: str, level=0
+) -> tuple[list[dict], list[dict]]:
+ edges: list[dict] = []
+ nodes: list[dict] = []
+ asset_condition_type: str | None = None
+
+ assets: list[Asset | AssetAlias] = []
+
+ nested_expression: AssetAll | AssetAny | None = None
+
+ if isinstance(asset_condition, AssetAny):
+ asset_condition_type = "or-gate"
+
+ elif isinstance(asset_condition, AssetAll):
+ asset_condition_type = "and-gate"
+
+ if hasattr(asset_condition, "objects"):
+ for obj in asset_condition.objects:
+ if isinstance(obj, (AssetAll, AssetAny)):
+ nested_expression = obj
+ elif isinstance(obj, (Asset, AssetAlias)):
+ assets.append(obj)
+ else:
+ raise TypeError(f"Unsupported type: {type(obj)}")
+
+ if asset_condition_type and assets:
+ asset_condition_id = f"{asset_condition_type}-{level}"
+ edges.append(
+ {
+ "source_id": asset_condition_id,
+ "target_id": entry_node_ref,
+ "is_source_asset": level == 0,
+ }
+ )
+ nodes.append(
+ {
+ "id": asset_condition_id,
+ "label": asset_condition_id,
+ "type": "asset-condition",
+ "asset_condition_type": asset_condition_type,
+ }
+ )
+
+ for asset in assets:
+ edges.append(
+ {
+ "source_id": asset.name,
+ "target_id": asset_condition_id,
+ }
+ )
+ nodes.append(
+ {
+ "id": asset.name,
+ "label": asset.name,
+ "type": "asset-alias" if isinstance(asset, AssetAlias)
else "asset",
+ }
+ )
+
+ if nested_expression is not None:
+ n, e = get_upstream_assets(nested_expression, asset_condition_id,
level=level + 1)
+
+ nodes = nodes + n
+ edges = edges + e
+
+ return nodes, edges
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 41e309d694c..696a9011d01 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2664,6 +2664,17 @@ export const $EdgeResponse = {
type: "string",
title: "Target Id",
},
+ is_source_asset: {
+ anyOf: [
+ {
+ type: "boolean",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Is Source Asset",
+ },
},
type: "object",
required: ["source_id", "target_id"],
@@ -3208,6 +3219,18 @@ export const $NodeResponse = {
],
title: "Operator",
},
+ asset_condition_type: {
+ anyOf: [
+ {
+ type: "string",
+ enum: ["or-gate", "and-gate"],
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Asset Condition Type",
+ },
},
type: "object",
required: ["id", "label", "type"],
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index c178f78608e..f667e06de09 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -643,6 +643,7 @@ export type EdgeResponse = {
label?: string | null;
source_id: string;
target_id: string;
+ is_source_asset?: boolean | null;
};
/**
@@ -783,6 +784,7 @@ export type NodeResponse = {
| "sensor"
| "trigger";
operator?: string | null;
+ asset_condition_type?: "or-gate" | "and-gate" | null;
};
export type type =
diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py
b/tests/api_fastapi/core_api/routes/ui/test_structure.py
index 0596b0f43e0..8efc6dc9d12 100644
--- a/tests/api_fastapi/core_api/routes/ui/test_structure.py
+++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py
@@ -90,12 +90,14 @@ class TestStructureDataEndpoint:
"edges": [
{
"is_setup_teardown": None,
+ "is_source_asset": None,
"label": None,
"source_id": "external_task_sensor",
"target_id": "task_2",
},
{
"is_setup_teardown": None,
+ "is_source_asset": None,
"label": None,
"source_id": "task_1",
"target_id": "external_task_sensor",
@@ -103,6 +105,7 @@ class TestStructureDataEndpoint:
],
"nodes": [
{
+ "asset_condition_type": None,
"children": None,
"id": "task_1",
"is_mapped": None,
@@ -113,6 +116,7 @@ class TestStructureDataEndpoint:
"operator": "EmptyOperator",
},
{
+ "asset_condition_type": None,
"children": None,
"id": "external_task_sensor",
"is_mapped": None,
@@ -123,6 +127,7 @@ class TestStructureDataEndpoint:
"operator": "ExternalTaskSensor",
},
{
+ "asset_condition_type": None,
"children": None,
"id": "task_2",
"is_mapped": None,
@@ -155,6 +160,7 @@ class TestStructureDataEndpoint:
"edges": [],
"nodes": [
{
+ "asset_condition_type": None,
"children": None,
"id": "task_1",
"is_mapped": None,
@@ -177,50 +183,65 @@ class TestStructureDataEndpoint:
{
"is_setup_teardown": None,
"label": None,
- "source_id":
"trigger:external_trigger:test_dag_id:trigger_dag_run_operator",
+ "source_id": "and-gate-0",
"target_id": "task_1",
+ "is_source_asset": True,
},
{
"is_setup_teardown": None,
"label": None,
- "source_id": "asset:asset1",
- "target_id": "task_1",
+ "source_id": "asset1",
+ "target_id": "and-gate-0",
+ "is_source_asset": None,
},
{
"is_setup_teardown": None,
"label": None,
- "source_id": "asset:asset2",
- "target_id": "task_1",
+ "source_id": "asset2",
+ "target_id": "and-gate-0",
+ "is_source_asset": None,
},
{
"is_setup_teardown": None,
"label": None,
- "source_id": "asset-alias:example-alias",
+ "source_id": "example-alias",
+ "target_id": "and-gate-0",
+ "is_source_asset": None,
+ },
+ {
+ "is_setup_teardown": None,
+ "label": None,
+ "source_id":
"trigger:external_trigger:test_dag_id:trigger_dag_run_operator",
"target_id": "task_1",
+ "is_source_asset": None,
},
{
"is_setup_teardown": None,
"label": None,
"source_id":
"sensor:test_dag_id:test_dag_id:external_task_sensor",
"target_id": "task_1",
+ "is_source_asset": None,
},
{
"is_setup_teardown": None,
"label": None,
"source_id": "external_task_sensor",
"target_id": "task_2",
+ "is_source_asset": None,
},
{
"is_setup_teardown": None,
"label": None,
"source_id": "task_1",
"target_id": "external_task_sensor",
+ "is_source_asset": None,
},
{
"is_setup_teardown": None,
"label": None,
"source_id": "task_2",
"target_id":
"asset:s3://dataset-bucket/example.csv",
+ "is_source_asset": None,
},
],
"nodes": [
@@ -233,6 +254,7 @@ class TestStructureDataEndpoint:
"setup_teardown_type": None,
"type": "task",
"operator": "EmptyOperator",
+ "asset_condition_type": None,
},
{
"children": None,
@@ -243,6 +265,7 @@ class TestStructureDataEndpoint:
"setup_teardown_type": None,
"type": "task",
"operator": "ExternalTaskSensor",
+ "asset_condition_type": None,
},
{
"children": None,
@@ -253,6 +276,7 @@ class TestStructureDataEndpoint:
"setup_teardown_type": None,
"type": "task",
"operator": "EmptyOperator",
+ "asset_condition_type": None,
},
{
"children": None,
@@ -263,56 +287,73 @@ class TestStructureDataEndpoint:
"setup_teardown_type": None,
"type": "trigger",
"operator": None,
+ "asset_condition_type": None,
},
{
"children": None,
- "id": "asset:asset1",
+ "id": "asset:s3://dataset-bucket/example.csv",
"is_mapped": None,
- "label": "asset1",
+ "label": "s3://dataset-bucket/example.csv",
"tooltip": None,
"setup_teardown_type": None,
"type": "asset",
"operator": None,
+ "asset_condition_type": None,
},
{
"children": None,
- "id": "asset:asset2",
+ "id":
"sensor:test_dag_id:test_dag_id:external_task_sensor",
"is_mapped": None,
- "label": "asset2",
+ "label": "external_task_sensor",
"tooltip": None,
"setup_teardown_type": None,
- "type": "asset",
+ "type": "sensor",
"operator": None,
+ "asset_condition_type": None,
},
{
"children": None,
- "id": "asset-alias:example-alias",
+ "id": "and-gate-0",
"is_mapped": None,
- "label": "example-alias",
+ "label": "and-gate-0",
"tooltip": None,
"setup_teardown_type": None,
- "type": "asset-alias",
+ "type": "asset-condition",
"operator": None,
+ "asset_condition_type": "and-gate",
},
{
"children": None,
- "id": "asset:s3://dataset-bucket/example.csv",
+ "id": "asset1",
"is_mapped": None,
- "label": "s3://dataset-bucket/example.csv",
+ "label": "asset1",
"tooltip": None,
"setup_teardown_type": None,
"type": "asset",
"operator": None,
+ "asset_condition_type": None,
},
{
"children": None,
- "id":
"sensor:test_dag_id:test_dag_id:external_task_sensor",
+ "id": "asset2",
"is_mapped": None,
- "label": "external_task_sensor",
+ "label": "asset2",
"tooltip": None,
"setup_teardown_type": None,
- "type": "sensor",
+ "type": "asset",
+ "operator": None,
+ "asset_condition_type": None,
+ },
+ {
+ "children": None,
+ "id": "example-alias",
+ "is_mapped": None,
+ "label": "example-alias",
+ "tooltip": None,
+ "setup_teardown_type": None,
+ "type": "asset-alias",
"operator": None,
+ "asset_condition_type": None,
},
],
"arrange": "LR",
@@ -323,6 +364,7 @@ class TestStructureDataEndpoint:
{
"edges": [
{
+ "is_source_asset": None,
"is_setup_teardown": None,
"label": None,
"source_id": "trigger_dag_run_operator",
@@ -331,6 +373,7 @@ class TestStructureDataEndpoint:
],
"nodes": [
{
+ "asset_condition_type": None,
"children": None,
"id": "trigger_dag_run_operator",
"is_mapped": None,
@@ -341,6 +384,7 @@ class TestStructureDataEndpoint:
"operator": "TriggerDagRunOperator",
},
{
+ "asset_condition_type": None,
"children": None,
"id":
"trigger:external_trigger:test_dag_id:trigger_dag_run_operator",
"is_mapped": None,