This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 13a18c34207 AIP-84 add external dependencies asset condition (#44877)
13a18c34207 is described below

commit 13a18c34207a977fa536520621c814146cb126a0
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Tue Dec 17 01:19:34 2024 +0800

    AIP-84 add external dependencies asset condition (#44877)
    
    * Structure endpoint add asset-condition
    
    * Fix CI
---
 .../core_api/datamodels/ui/structure.py            |  2 +
 .../api_fastapi/core_api/openapi/v1-generated.yaml | 13 +++
 .../api_fastapi/core_api/routes/ui/structure.py    | 32 +++++---
 airflow/api_fastapi/core_api/services/__init__.py  | 16 ++++
 airflow/api_fastapi/core_api/services/ui/__init.py | 16 ++++
 .../api_fastapi/core_api/services/ui/structure.py  | 94 ++++++++++++++++++++++
 airflow/ui/openapi-gen/requests/schemas.gen.ts     | 23 ++++++
 airflow/ui/openapi-gen/requests/types.gen.ts       |  2 +
 .../core_api/routes/ui/test_structure.py           | 82 ++++++++++++++-----
 9 files changed, 252 insertions(+), 28 deletions(-)

diff --git a/airflow/api_fastapi/core_api/datamodels/ui/structure.py 
b/airflow/api_fastapi/core_api/datamodels/ui/structure.py
index 40c7b9dc077..35874caa70c 100644
--- a/airflow/api_fastapi/core_api/datamodels/ui/structure.py
+++ b/airflow/api_fastapi/core_api/datamodels/ui/structure.py
@@ -28,6 +28,7 @@ class EdgeResponse(BaseModel):
     label: str | None = None
     source_id: str
     target_id: str
+    is_source_asset: bool | None = None
 
 
 class NodeResponse(BaseModel):
@@ -41,6 +42,7 @@ class NodeResponse(BaseModel):
     setup_teardown_type: Literal["setup", "teardown"] | None = None
     type: Literal["join", "task", "asset-condition", "asset", "asset-alias", 
"dag", "sensor", "trigger"]
     operator: str | None = None
+    asset_condition_type: Literal["or-gate", "and-gate"] | None = None
 
 
 class StructureDataResponse(BaseModel):
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index a36e45d4241..cf1260f5a84 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -7734,6 +7734,11 @@ components:
         target_id:
           type: string
           title: Target Id
+        is_source_asset:
+          anyOf:
+          - type: boolean
+          - type: 'null'
+          title: Is Source Asset
       type: object
       required:
       - source_id
@@ -8079,6 +8084,14 @@ components:
           - type: string
           - type: 'null'
           title: Operator
+        asset_condition_type:
+          anyOf:
+          - type: string
+            enum:
+            - or-gate
+            - and-gate
+          - type: 'null'
+          title: Asset Condition Type
       type: object
       required:
       - id
diff --git a/airflow/api_fastapi/core_api/routes/ui/structure.py 
b/airflow/api_fastapi/core_api/routes/ui/structure.py
index c3b914508d4..417b711cf19 100644
--- a/airflow/api_fastapi/core_api/routes/ui/structure.py
+++ b/airflow/api_fastapi/core_api/routes/ui/structure.py
@@ -22,6 +22,7 @@ from airflow.api_fastapi.common.db.common import SessionDep
 from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.core_api.datamodels.ui.structure import 
StructureDataResponse
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.services.ui.structure import 
get_upstream_assets
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.utils.dag_edges import dag_edges
 from airflow.utils.task_group import task_group_to_dict
@@ -71,17 +72,16 @@ def structure_data(
 
         for dependency_dag_id, dependencies in 
SerializedDagModel.get_dag_dependencies().items():
             for dependency in dependencies:
+                # Dependencies not related to `dag_id` are ignored
                 if dependency_dag_id != dag_id and dependency.target != dag_id:
                     continue
 
-                # Add nodes
-                nodes.append(
-                    {
-                        "id": dependency.node_id,
-                        "label": dependency.dependency_id,
-                        "type": dependency.dependency_type,
-                    }
-                )
+                # upstream assets are handled by the `get_upstream_assets` 
function.
+                if dependency.target != dependency.dependency_type and 
dependency.dependency_type in [
+                    "asset-alias",
+                    "asset",
+                ]:
+                    continue
 
                 # Add edges
                 # start dependency
@@ -96,6 +96,20 @@ def structure_data(
                 ) and exit_node_ref:
                     end_edges.append({"source_id": exit_node_ref["id"], 
"target_id": dependency.node_id})
 
-        data["edges"] = start_edges + edges + end_edges
+                # Add nodes
+                nodes.append(
+                    {
+                        "id": dependency.node_id,
+                        "label": dependency.dependency_id,
+                        "type": dependency.dependency_type,
+                    }
+                )
+
+        upstream_asset_nodes, upstream_asset_edges = get_upstream_assets(
+            dag.timetable.asset_condition, entry_node_ref["id"]
+        )
+
+        data["nodes"] += upstream_asset_nodes
+        data["edges"] = upstream_asset_edges + start_edges + edges + end_edges
 
     return StructureDataResponse(**data)
diff --git a/airflow/api_fastapi/core_api/services/__init__.py 
b/airflow/api_fastapi/core_api/services/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/airflow/api_fastapi/core_api/services/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/api_fastapi/core_api/services/ui/__init.py 
b/airflow/api_fastapi/core_api/services/ui/__init.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/airflow/api_fastapi/core_api/services/ui/__init.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/api_fastapi/core_api/services/ui/structure.py 
b/airflow/api_fastapi/core_api/services/ui/structure.py
new file mode 100644
index 00000000000..72ee000b1f5
--- /dev/null
+++ b/airflow/api_fastapi/core_api/services/ui/structure.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Private service for dag structure.
+
+:meta private:
+"""
+
+from __future__ import annotations
+
+from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAll, 
AssetAny, BaseAsset
+
+
+def get_upstream_assets(
+    asset_condition: BaseAsset, entry_node_ref: str, level=0
+) -> tuple[list[dict], list[dict]]:
+    edges: list[dict] = []
+    nodes: list[dict] = []
+    asset_condition_type: str | None = None
+
+    assets: list[Asset | AssetAlias] = []
+
+    nested_expression: AssetAll | AssetAny | None = None
+
+    if isinstance(asset_condition, AssetAny):
+        asset_condition_type = "or-gate"
+
+    elif isinstance(asset_condition, AssetAll):
+        asset_condition_type = "and-gate"
+
+    if hasattr(asset_condition, "objects"):
+        for obj in asset_condition.objects:
+            if isinstance(obj, (AssetAll, AssetAny)):
+                nested_expression = obj
+            elif isinstance(obj, (Asset, AssetAlias)):
+                assets.append(obj)
+            else:
+                raise TypeError(f"Unsupported type: {type(obj)}")
+
+    if asset_condition_type and assets:
+        asset_condition_id = f"{asset_condition_type}-{level}"
+        edges.append(
+            {
+                "source_id": asset_condition_id,
+                "target_id": entry_node_ref,
+                "is_source_asset": level == 0,
+            }
+        )
+        nodes.append(
+            {
+                "id": asset_condition_id,
+                "label": asset_condition_id,
+                "type": "asset-condition",
+                "asset_condition_type": asset_condition_type,
+            }
+        )
+
+        for asset in assets:
+            edges.append(
+                {
+                    "source_id": asset.name,
+                    "target_id": asset_condition_id,
+                }
+            )
+            nodes.append(
+                {
+                    "id": asset.name,
+                    "label": asset.name,
+                    "type": "asset-alias" if isinstance(asset, AssetAlias) 
else "asset",
+                }
+            )
+
+        if nested_expression is not None:
+            n, e = get_upstream_assets(nested_expression, asset_condition_id, 
level=level + 1)
+
+            nodes = nodes + n
+            edges = edges + e
+
+    return nodes, edges
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 41e309d694c..696a9011d01 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2664,6 +2664,17 @@ export const $EdgeResponse = {
       type: "string",
       title: "Target Id",
     },
+    is_source_asset: {
+      anyOf: [
+        {
+          type: "boolean",
+        },
+        {
+          type: "null",
+        },
+      ],
+      title: "Is Source Asset",
+    },
   },
   type: "object",
   required: ["source_id", "target_id"],
@@ -3208,6 +3219,18 @@ export const $NodeResponse = {
       ],
       title: "Operator",
     },
+    asset_condition_type: {
+      anyOf: [
+        {
+          type: "string",
+          enum: ["or-gate", "and-gate"],
+        },
+        {
+          type: "null",
+        },
+      ],
+      title: "Asset Condition Type",
+    },
   },
   type: "object",
   required: ["id", "label", "type"],
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index c178f78608e..f667e06de09 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -643,6 +643,7 @@ export type EdgeResponse = {
   label?: string | null;
   source_id: string;
   target_id: string;
+  is_source_asset?: boolean | null;
 };
 
 /**
@@ -783,6 +784,7 @@ export type NodeResponse = {
     | "sensor"
     | "trigger";
   operator?: string | null;
+  asset_condition_type?: "or-gate" | "and-gate" | null;
 };
 
 export type type =
diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py 
b/tests/api_fastapi/core_api/routes/ui/test_structure.py
index 0596b0f43e0..8efc6dc9d12 100644
--- a/tests/api_fastapi/core_api/routes/ui/test_structure.py
+++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py
@@ -90,12 +90,14 @@ class TestStructureDataEndpoint:
                     "edges": [
                         {
                             "is_setup_teardown": None,
+                            "is_source_asset": None,
                             "label": None,
                             "source_id": "external_task_sensor",
                             "target_id": "task_2",
                         },
                         {
                             "is_setup_teardown": None,
+                            "is_source_asset": None,
                             "label": None,
                             "source_id": "task_1",
                             "target_id": "external_task_sensor",
@@ -103,6 +105,7 @@ class TestStructureDataEndpoint:
                     ],
                     "nodes": [
                         {
+                            "asset_condition_type": None,
                             "children": None,
                             "id": "task_1",
                             "is_mapped": None,
@@ -113,6 +116,7 @@ class TestStructureDataEndpoint:
                             "operator": "EmptyOperator",
                         },
                         {
+                            "asset_condition_type": None,
                             "children": None,
                             "id": "external_task_sensor",
                             "is_mapped": None,
@@ -123,6 +127,7 @@ class TestStructureDataEndpoint:
                             "operator": "ExternalTaskSensor",
                         },
                         {
+                            "asset_condition_type": None,
                             "children": None,
                             "id": "task_2",
                             "is_mapped": None,
@@ -155,6 +160,7 @@ class TestStructureDataEndpoint:
                     "edges": [],
                     "nodes": [
                         {
+                            "asset_condition_type": None,
                             "children": None,
                             "id": "task_1",
                             "is_mapped": None,
@@ -177,50 +183,65 @@ class TestStructureDataEndpoint:
                         {
                             "is_setup_teardown": None,
                             "label": None,
-                            "source_id": 
"trigger:external_trigger:test_dag_id:trigger_dag_run_operator",
+                            "source_id": "and-gate-0",
                             "target_id": "task_1",
+                            "is_source_asset": True,
                         },
                         {
                             "is_setup_teardown": None,
                             "label": None,
-                            "source_id": "asset:asset1",
-                            "target_id": "task_1",
+                            "source_id": "asset1",
+                            "target_id": "and-gate-0",
+                            "is_source_asset": None,
                         },
                         {
                             "is_setup_teardown": None,
                             "label": None,
-                            "source_id": "asset:asset2",
-                            "target_id": "task_1",
+                            "source_id": "asset2",
+                            "target_id": "and-gate-0",
+                            "is_source_asset": None,
                         },
                         {
                             "is_setup_teardown": None,
                             "label": None,
-                            "source_id": "asset-alias:example-alias",
+                            "source_id": "example-alias",
+                            "target_id": "and-gate-0",
+                            "is_source_asset": None,
+                        },
+                        {
+                            "is_setup_teardown": None,
+                            "label": None,
+                            "source_id": 
"trigger:external_trigger:test_dag_id:trigger_dag_run_operator",
                             "target_id": "task_1",
+                            "is_source_asset": None,
                         },
                         {
                             "is_setup_teardown": None,
                             "label": None,
                             "source_id": 
"sensor:test_dag_id:test_dag_id:external_task_sensor",
                             "target_id": "task_1",
+                            "is_source_asset": None,
                         },
                         {
                             "is_setup_teardown": None,
                             "label": None,
                             "source_id": "external_task_sensor",
                             "target_id": "task_2",
+                            "is_source_asset": None,
                         },
                         {
                             "is_setup_teardown": None,
                             "label": None,
                             "source_id": "task_1",
                             "target_id": "external_task_sensor",
+                            "is_source_asset": None,
                         },
                         {
                             "is_setup_teardown": None,
                             "label": None,
                             "source_id": "task_2",
                             "target_id": 
"asset:s3://dataset-bucket/example.csv",
+                            "is_source_asset": None,
                         },
                     ],
                     "nodes": [
@@ -233,6 +254,7 @@ class TestStructureDataEndpoint:
                             "setup_teardown_type": None,
                             "type": "task",
                             "operator": "EmptyOperator",
+                            "asset_condition_type": None,
                         },
                         {
                             "children": None,
@@ -243,6 +265,7 @@ class TestStructureDataEndpoint:
                             "setup_teardown_type": None,
                             "type": "task",
                             "operator": "ExternalTaskSensor",
+                            "asset_condition_type": None,
                         },
                         {
                             "children": None,
@@ -253,6 +276,7 @@ class TestStructureDataEndpoint:
                             "setup_teardown_type": None,
                             "type": "task",
                             "operator": "EmptyOperator",
+                            "asset_condition_type": None,
                         },
                         {
                             "children": None,
@@ -263,56 +287,73 @@ class TestStructureDataEndpoint:
                             "setup_teardown_type": None,
                             "type": "trigger",
                             "operator": None,
+                            "asset_condition_type": None,
                         },
                         {
                             "children": None,
-                            "id": "asset:asset1",
+                            "id": "asset:s3://dataset-bucket/example.csv",
                             "is_mapped": None,
-                            "label": "asset1",
+                            "label": "s3://dataset-bucket/example.csv",
                             "tooltip": None,
                             "setup_teardown_type": None,
                             "type": "asset",
                             "operator": None,
+                            "asset_condition_type": None,
                         },
                         {
                             "children": None,
-                            "id": "asset:asset2",
+                            "id": 
"sensor:test_dag_id:test_dag_id:external_task_sensor",
                             "is_mapped": None,
-                            "label": "asset2",
+                            "label": "external_task_sensor",
                             "tooltip": None,
                             "setup_teardown_type": None,
-                            "type": "asset",
+                            "type": "sensor",
                             "operator": None,
+                            "asset_condition_type": None,
                         },
                         {
                             "children": None,
-                            "id": "asset-alias:example-alias",
+                            "id": "and-gate-0",
                             "is_mapped": None,
-                            "label": "example-alias",
+                            "label": "and-gate-0",
                             "tooltip": None,
                             "setup_teardown_type": None,
-                            "type": "asset-alias",
+                            "type": "asset-condition",
                             "operator": None,
+                            "asset_condition_type": "and-gate",
                         },
                         {
                             "children": None,
-                            "id": "asset:s3://dataset-bucket/example.csv",
+                            "id": "asset1",
                             "is_mapped": None,
-                            "label": "s3://dataset-bucket/example.csv",
+                            "label": "asset1",
                             "tooltip": None,
                             "setup_teardown_type": None,
                             "type": "asset",
                             "operator": None,
+                            "asset_condition_type": None,
                         },
                         {
                             "children": None,
-                            "id": 
"sensor:test_dag_id:test_dag_id:external_task_sensor",
+                            "id": "asset2",
                             "is_mapped": None,
-                            "label": "external_task_sensor",
+                            "label": "asset2",
                             "tooltip": None,
                             "setup_teardown_type": None,
-                            "type": "sensor",
+                            "type": "asset",
+                            "operator": None,
+                            "asset_condition_type": None,
+                        },
+                        {
+                            "children": None,
+                            "id": "example-alias",
+                            "is_mapped": None,
+                            "label": "example-alias",
+                            "tooltip": None,
+                            "setup_teardown_type": None,
+                            "type": "asset-alias",
                             "operator": None,
+                            "asset_condition_type": None,
                         },
                     ],
                     "arrange": "LR",
@@ -323,6 +364,7 @@ class TestStructureDataEndpoint:
                 {
                     "edges": [
                         {
+                            "is_source_asset": None,
                             "is_setup_teardown": None,
                             "label": None,
                             "source_id": "trigger_dag_run_operator",
@@ -331,6 +373,7 @@ class TestStructureDataEndpoint:
                     ],
                     "nodes": [
                         {
+                            "asset_condition_type": None,
                             "children": None,
                             "id": "trigger_dag_run_operator",
                             "is_mapped": None,
@@ -341,6 +384,7 @@ class TestStructureDataEndpoint:
                             "operator": "TriggerDagRunOperator",
                         },
                         {
+                            "asset_condition_type": None,
                             "children": None,
                             "id": 
"trigger:external_trigger:test_dag_id:trigger_dag_run_operator",
                             "is_mapped": None,

Reply via email to