jason810496 commented on code in PR #58059:
URL: https://github.com/apache/airflow/pull/58059#discussion_r2605815263
##########
airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py:
##########
@@ -76,3 +82,99 @@ def extract_single_connected_component(
]
return {"nodes": nodes, "edges": edges}
+
+
+def get_data_dependencies(asset_id: int, session: Session) -> dict[str,
list[dict]]:
+ """Get full data lineage for an asset."""
+ from sqlalchemy import select
+
+ from airflow.models.asset import TaskInletAssetReference,
TaskOutletAssetReference
+
+ SEPARATOR = "__SEPARATOR__"
+
+ nodes: list[dict] = []
+ edges: list[dict] = []
+ node_ids: set[str] = set()
+ edge_set: set[tuple[str, str]] = set()
Review Comment:
It seems we just need the set and convert it back to list at the end of the
function. Or we could even just returning the set and change the `edges` and
`nodes` to `Iterable` instead of `list` for `BaseGraphResponse`.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py:
##########
@@ -41,8 +46,24 @@
),
dependencies=[Depends(requires_access_dag("GET",
DagAccessEntity.DEPENDENCIES))],
)
-def get_dependencies(session: SessionDep, node_id: str | None = None) ->
BaseGraphResponse:
+def get_dependencies(
+ session: SessionDep,
+ node_id: str | None = None,
+ dependency_type: Literal["scheduling", "data"] = "scheduling",
+) -> BaseGraphResponse:
"""Dependencies graph."""
+ if dependency_type == "data":
+ if node_id is None or not node_id.startswith("asset:"):
+ raise HTTPException(400, "Data dependencies require an asset
node_id (e.g., 'asset:123')")
+
+ try:
+ asset_id = int(node_id.replace("asset:", ""))
+ except ValueError:
+ raise HTTPException(400, f"Invalid asset node_id: {node_id}")
Review Comment:
Just to align the `HTTPException` usage with the other routers.
```suggestion
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Data
dependencies require an asset node_id (e.g., 'asset:123')")
try:
asset_id = int(node_id.replace("asset:", ""))
except ValueError:
raise HTTPException(status.HTTP_400_BAD_REQUEST, f"Invalid asset
node_id: {node_id}")
```
##########
airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py:
##########
@@ -76,3 +82,99 @@ def extract_single_connected_component(
]
return {"nodes": nodes, "edges": edges}
+
+
+def get_data_dependencies(asset_id: int, session: Session) -> dict[str,
list[dict]]:
+ """Get full data lineage for an asset."""
+ from sqlalchemy import select
+
+ from airflow.models.asset import TaskInletAssetReference,
TaskOutletAssetReference
+
+ SEPARATOR = "__SEPARATOR__"
+
+ nodes: list[dict] = []
+ edges: list[dict] = []
+ node_ids: set[str] = set()
+ edge_set: set[tuple[str, str]] = set()
+
+ # BFS to trace full lineage
+ assets_to_process: list[int] = [asset_id]
Review Comment:
Would it be better to leverage `deque` instead of `list` here for the BFS?
Since the time complexity of `deque.popleft` will be `O(1)` and
`list.pop(0)` will be `O(n)`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]