guan404ming commented on code in PR #58059:
URL: https://github.com/apache/airflow/pull/58059#discussion_r2703733556


##########
airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py:
##########
@@ -272,3 +272,35 @@ def test_with_node_id_filter_not_found(self, test_client):
         assert response.json() == {
             "detail": "Unique connected component not found, got [] for 
connected components of node missing_node_id, expected only 1 connected 
component.",
         }
+
+    @pytest.mark.parametrize(
+        ("dependency_type", "has_dag", "has_task"),
+        [
+            (None, True, False),  # default is scheduling
+            ("scheduling", True, False),
+            ("data", False, True),
+        ],
+    )
+    def test_dependency_type_filter(self, test_client, asset1_id, 
dependency_type, has_dag, has_task):
+        params = {"node_id": f"asset:{asset1_id}"}
+        if dependency_type is not None:
+            params["dependency_type"] = dependency_type
+
+        response = test_client.get("/dependencies", params=params)
+        assert response.status_code == 200
+
+        result = response.json()
+        node_types = {node["type"] for node in result["nodes"]}
+
+        assert "asset" in node_types
+        assert ("dag" in node_types) == has_dag
+        assert ("task" in node_types) == has_task

Review Comment:
   > We probably want to expand this to assert what the graph exactly look like 
and verify that on a complex structure, nodes are the one expected. (Here 
returning a graph with no dag and some tasks are enough to success the test)
   
   Sure, I've updated with more detailed tests.
   
   > But we don't know the dag id this task is from. There could be multiple 
tasks with the same name accross different dags.
   
   I've added a label below the task name for identifying where the task from.
   



##########
airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py:
##########
@@ -76,3 +82,91 @@ def extract_single_connected_component(
     ]
 
     return {"nodes": nodes, "edges": edges}
+
+
+def get_data_dependencies(asset_id: int, session: Session) -> dict[str, 
list[dict]]:
+    """Get full data lineage for an asset."""
+    from sqlalchemy import select
+
+    from airflow.models.asset import TaskInletAssetReference, 
TaskOutletAssetReference
+
+    SEPARATOR = "__SEPARATOR__"
+
+    nodes_dict: dict[str, dict] = {}
+    edge_set: set[tuple[str, str]] = set()
+
+    # BFS to trace full lineage
+    assets_to_process: deque[int] = deque([asset_id])
+    processed_assets: set[int] = set()
+    processed_tasks: set[tuple[str, str]] = set()  # (dag_id, task_id)
+
+    while assets_to_process:
+        current_asset_id = assets_to_process.popleft()
+        if current_asset_id in processed_assets:
+            continue
+        processed_assets.add(current_asset_id)
+
+        asset = session.get(AssetModel, current_asset_id)

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to