pierrejeambrun commented on code in PR #27356:
URL: https://github.com/apache/airflow/pull/27356#discussion_r1014633359


##########
airflow/www/static/js/api/useDatasetDependencies.ts:
##########
@@ -75,15 +82,130 @@ const generateGraph = ({ nodes, edges, font }: 
GenerateProps) => ({
   edges: edges.map((e) => ({ id: `${e.source}-${e.target}`, sources: 
[e.source], targets: [e.target] })),
 });
 
+interface SeparateGraphsProps extends DatasetDependencies {
+  graphs: DatasetDependencies[];
+}
+
+const graphIndicesToMerge: Record<number, number[]> = {};
+const indicesToRemove: number[] = [];
+
+// find the downstream graph of each upstream edge
+const findDownstreamGraph = (
+  { edges, nodes, graphs = [] }: SeparateGraphsProps,
+): DatasetDependencies[] => {
+  const newGraphs = [...graphs];
+  let filteredEdges = [...edges];
+
+  graphs.forEach((g, i) => {
+    // find downstream edges
+    const downstreamEdges = edges.filter((e) => g.edges.some((ge) => ge.target 
=== e.source));
+    const downstreamNodes: DepNode[] = [];
+
+    downstreamEdges.forEach((e) => {
+      const newNode = nodes.find((n) => n.id === e.target);
+      if (newNode) {
+        downstreamNodes.push(newNode);
+
+        // check if the node already exists in a different graph
+        const existingGraphIndex = newGraphs
+          .findIndex(((ng) => ng.nodes.some((n) => n.id === newNode.id)));
+
+        // mark if the graph needs to merge with another
+        if (existingGraphIndex > -1) {
+          indicesToRemove.push(existingGraphIndex);
+          graphIndicesToMerge[i] = [...(graphIndicesToMerge[i] || []), 
existingGraphIndex];
+        }
+
+        // add node and edge to the graph
+        newGraphs[i] = {
+          nodes: [...newGraphs[i].nodes, newNode],
+          edges: [...newGraphs[i].edges, e],
+        };
+
+        // remove edge from edge list
+        filteredEdges = filteredEdges
+          .filter((fe) => !(fe.source === e.source && fe.target === e.target));
+      }
+    });
+  });
+
+  // once there are no more filtered edges left, merge relevant graphs
+  // we merge afterwards to make sure we captured all nodes + edges
+  if (!filteredEdges.length) {
+    Object.keys(graphIndicesToMerge).forEach((key) => {
+      const realKey = key as unknown as number;
+      const values = graphIndicesToMerge[realKey];
+      values.forEach((v) => {
+        newGraphs[realKey] = {
+          nodes: unionBy(newGraphs[realKey].nodes, newGraphs[v].nodes, 'id'),
+          edges: [...newGraphs[realKey].edges, ...newGraphs[v].edges]
+            .filter((e, i, s) => (
+              i === s.findIndex((t) => t.source === e.source && t.target === 
e.target)
+            )),
+        };
+      });
+    });
+    return newGraphs.filter((g, i) => !indicesToRemove.some((j) => i === j));
+  }
+
+  return findDownstreamGraph({ edges: filteredEdges, nodes, graphs: newGraphs 
});
+};
+
+// separate the list of nodes/edges into distinct dataset pipeline graphs
+const separateGraphs = ({ edges, nodes }: DatasetDependencies): 
DatasetDependencies[] => {
+  const separatedGraphs: DatasetDependencies[] = [];
+  let remainingEdges = [...edges];
+  let remainingNodes = [...nodes];
+
+  edges.forEach((edge) => {
+    const isDownstream = edges.some((e) => e.target === edge.source);
+
+    // if the edge is not downstream of anything, then start building the graph
+    if (!isDownstream) {
+      const connectedNodes = nodes.filter((n) => n.id === edge.source || n.id 
=== edge.target);
+
+      // check if the node is already connected to a separated graph
+      const nodesInUse = separatedGraphs
+        .findIndex((g) => g.nodes.some((n) => connectedNodes.some((nn) => 
nn.id === n.id)));
+
+      if (nodesInUse > -1) {
+        // if the node is in use, merge the graphs

Review Comment:
   ```suggestion
           // if on of the nodes is already in use, merge the graphs
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to