guan404ming commented on code in PR #58059:
URL: https://github.com/apache/airflow/pull/58059#discussion_r2703733556
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py:
##########
@@ -272,3 +272,35 @@ def test_with_node_id_filter_not_found(self, test_client):
assert response.json() == {
"detail": "Unique connected component not found, got [] for
connected components of node missing_node_id, expected only 1 connected
component.",
}
+
+ @pytest.mark.parametrize(
+ ("dependency_type", "has_dag", "has_task"),
+ [
+ (None, True, False), # default is scheduling
+ ("scheduling", True, False),
+ ("data", False, True),
+ ],
+ )
+ def test_dependency_type_filter(self, test_client, asset1_id,
dependency_type, has_dag, has_task):
+ params = {"node_id": f"asset:{asset1_id}"}
+ if dependency_type is not None:
+ params["dependency_type"] = dependency_type
+
+ response = test_client.get("/dependencies", params=params)
+ assert response.status_code == 200
+
+ result = response.json()
+ node_types = {node["type"] for node in result["nodes"]}
+
+ assert "asset" in node_types
+ assert ("dag" in node_types) == has_dag
+ assert ("task" in node_types) == has_task
Review Comment:
> We probably want to expand this to assert what the graph exactly look like
and verify that on a complex structure, nodes are the one expected. (Here
returning a graph with no dag and some tasks are enough to success the test)
Sure, I've updated with more detailed tests.
> But we don't know the dag id this task is from. There could be multiple
tasks with the same name accross different dags.
I've added a label below the task name for identifying where the task from.
##########
airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py:
##########
@@ -76,3 +82,91 @@ def extract_single_connected_component(
]
return {"nodes": nodes, "edges": edges}
+
+
+def get_data_dependencies(asset_id: int, session: Session) -> dict[str,
list[dict]]:
+ """Get full data lineage for an asset."""
+ from sqlalchemy import select
+
+ from airflow.models.asset import TaskInletAssetReference,
TaskOutletAssetReference
+
+ SEPARATOR = "__SEPARATOR__"
+
+ nodes_dict: dict[str, dict] = {}
+ edge_set: set[tuple[str, str]] = set()
+
+ # BFS to trace full lineage
+ assets_to_process: deque[int] = deque([asset_id])
+ processed_assets: set[int] = set()
+ processed_tasks: set[tuple[str, str]] = set() # (dag_id, task_id)
+
+ while assets_to_process:
+ current_asset_id = assets_to_process.popleft()
+ if current_asset_id in processed_assets:
+ continue
+ processed_assets.add(current_asset_id)
+
+ asset = session.get(AssetModel, current_asset_id)
Review Comment:
Updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]