This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e84c032cec Separate dataset deps into individual graphs (#27356)
e84c032cec is described below

commit e84c032cec911fd738ebb0b93a38994a10e3ca27
Author: Brent Bovenzi <[email protected]>
AuthorDate: Wed Nov 9 08:41:41 2022 -0600

    Separate dataset deps into individual graphs (#27356)
    
    * separate dataset deps graphs
    
    * fix graph merging, include full graph
    
    * fix edge filtering/merging
    
    * add comments
    
    * fix graph component
    
    * Update airflow/www/static/js/api/useDatasetDependencies.ts
    
    Co-authored-by: Pierre Jeambrun <[email protected]>
    
    * Update airflow/www/static/js/api/useDatasetDependencies.ts
    
    Co-authored-by: Pierre Jeambrun <[email protected]>
    
    Co-authored-by: Pierre Jeambrun <[email protected]>
---
 .../www/static/js/api/useDatasetDependencies.ts    | 128 ++++++++++++++++++++-
 airflow/www/static/js/datasets/Graph/index.tsx     |  21 ++--
 2 files changed, 138 insertions(+), 11 deletions(-)

diff --git a/airflow/www/static/js/api/useDatasetDependencies.ts 
b/airflow/www/static/js/api/useDatasetDependencies.ts
index d7ec3260e6..bd61ee9087 100644
--- a/airflow/www/static/js/api/useDatasetDependencies.ts
+++ b/airflow/www/static/js/api/useDatasetDependencies.ts
@@ -24,6 +24,7 @@ import ELK, { ElkShape, ElkExtendedEdge } from 'elkjs';
 import { getMetaValue } from 'src/utils';
 import type { DepEdge, DepNode } from 'src/types';
 import type { NodeType } from 'src/datasets/Graph/Node';
+import { unionBy } from 'lodash';
 
 interface DatasetDependencies {
   edges: DepEdge[];
@@ -36,11 +37,16 @@ interface GenerateProps {
   font: string;
 }
 
-interface Data extends ElkShape {
+interface Graph extends ElkShape {
   children: NodeType[];
   edges: ElkExtendedEdge[];
 }
 
+interface Data {
+  fullGraph: Graph;
+  subGraphs: Graph[];
+}
+
 // Take text and font to calculate how long each node should be
 function getTextWidth(text: string, font: string) {
   const context = document.createElement('canvas').getContext('2d');
@@ -59,6 +65,7 @@ const generateGraph = ({ nodes, edges, font }: GenerateProps) 
=> ({
     'spacing.edgeNodeBetweenLayers': '10.0',
     'layering.strategy': 'INTERACTIVE',
     algorithm: 'layered',
+    'crossingMinimization.semiInteractive': 'true',
     'spacing.edgeEdgeBetweenLayers': '10.0',
     'spacing.edgeNode': '10.0',
     'spacing.edgeEdge': '10.0',
@@ -75,15 +82,130 @@ const generateGraph = ({ nodes, edges, font }: 
GenerateProps) => ({
   edges: edges.map((e) => ({ id: `${e.source}-${e.target}`, sources: 
[e.source], targets: [e.target] })),
 });
 
+interface SeparateGraphsProps extends DatasetDependencies {
+  graphs: DatasetDependencies[];
+}
+
+const graphIndicesToMerge: Record<number, number[]> = {};
+const indicesToRemove: number[] = [];
+
+// find the downstream graph of each upstream edge
+const findDownstreamGraph = (
+  { edges, nodes, graphs = [] }: SeparateGraphsProps,
+): DatasetDependencies[] => {
+  const newGraphs = [...graphs];
+  let filteredEdges = [...edges];
+
+  graphs.forEach((g, i) => {
+    // find downstream edges
+    const downstreamEdges = edges.filter((e) => g.edges.some((ge) => ge.target 
=== e.source));
+    const downstreamNodes: DepNode[] = [];
+
+    downstreamEdges.forEach((e) => {
+      const newNode = nodes.find((n) => n.id === e.target);
+      if (newNode) {
+        downstreamNodes.push(newNode);
+
+        // check if the node already exists in a different graph
+        const existingGraphIndex = newGraphs
+          .findIndex(((ng) => ng.nodes.some((n) => n.id === newNode.id)));
+
+        // mark if the graph needs to merge with another
+        if (existingGraphIndex > -1) {
+          indicesToRemove.push(existingGraphIndex);
+          graphIndicesToMerge[i] = [...(graphIndicesToMerge[i] || []), 
existingGraphIndex];
+        }
+
+        // add node and edge to the graph
+        newGraphs[i] = {
+          nodes: [...newGraphs[i].nodes, newNode],
+          edges: [...newGraphs[i].edges, e],
+        };
+
+        // remove edge from edge list
+        filteredEdges = filteredEdges
+          .filter((fe) => !(fe.source === e.source && fe.target === e.target));
+      }
+    });
+  });
+
+  // once there are no more filtered edges left, merge relevant graphs
+  // we merge afterwards to make sure we captured all nodes + edges
+  if (!filteredEdges.length) {
+    Object.keys(graphIndicesToMerge).forEach((key) => {
+      const realKey = key as unknown as number;
+      const values = graphIndicesToMerge[realKey];
+      values.forEach((v) => {
+        newGraphs[realKey] = {
+          nodes: unionBy(newGraphs[realKey].nodes, newGraphs[v].nodes, 'id'),
+          edges: [...newGraphs[realKey].edges, ...newGraphs[v].edges]
+            .filter((e, i, s) => (
+              i === s.findIndex((t) => t.source === e.source && t.target === 
e.target)
+            )),
+        };
+      });
+    });
+    return newGraphs.filter((g, i) => !indicesToRemove.some((j) => i === j));
+  }
+
+  return findDownstreamGraph({ edges: filteredEdges, nodes, graphs: newGraphs 
});
+};
+
+// separate the list of nodes/edges into distinct dataset pipeline graphs
+const separateGraphs = ({ edges, nodes }: DatasetDependencies): 
DatasetDependencies[] => {
+  const separatedGraphs: DatasetDependencies[] = [];
+  let remainingEdges = [...edges];
+  let remainingNodes = [...nodes];
+
+  edges.forEach((edge) => {
+    const isDownstream = edges.some((e) => e.target === edge.source);
+
+    // if the edge is not downstream of anything, then start building the graph
+    if (!isDownstream) {
+      const connectedNodes = nodes.filter((n) => n.id === edge.source || n.id 
=== edge.target);
+
+      // check if one of the nodes is already connected to a separated graph
+      const nodesInUse = separatedGraphs
+        .findIndex((g) => g.nodes.some((n) => connectedNodes.some((nn) => 
nn.id === n.id)));
+
+      if (nodesInUse > -1) {
+        // if one of the nodes is already in use, merge the graphs
+        const { nodes: existingNodes, edges: existingEdges } = 
separatedGraphs[nodesInUse];
+        separatedGraphs[nodesInUse] = { nodes: unionBy(existingNodes, 
connectedNodes, 'id'), edges: [...existingEdges, edge] };
+      } else {
+        // else just add the new separated graph
+        separatedGraphs.push({ nodes: connectedNodes, edges: [edge] });
+      }
+
+      // filter out used nodes and edges
+      remainingEdges = remainingEdges.filter((e) => e.source !== edge.source);
+      remainingNodes = remainingNodes.filter((n) => !connectedNodes.some((nn) 
=> nn.id === n.id));
+    }
+  });
+
+  if (remainingEdges.length) {
+    return findDownstreamGraph({ edges: remainingEdges, nodes, graphs: 
separatedGraphs });
+  }
+  return separatedGraphs;
+};
+
 const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => {
   const elk = new ELK();
 
+  const graphs = separateGraphs({ edges, nodes });
+
   // get computed style to calculate how large each node should be
   const font = `bold ${16}px 
${window.getComputedStyle(document.body).fontFamily}`;
 
   // Finally generate the graph data with elk
-  const data = await elk.layout(generateGraph({ nodes, edges, font }));
-  return data as Data;
+  const subGraphs = await Promise.all(graphs.map(async (g) => (
+    elk.layout(generateGraph({ nodes: g.nodes, edges: g.edges, font }))
+  )));
+  const fullGraph = await elk.layout(generateGraph({ nodes, edges, font }));
+  return {
+    fullGraph,
+    subGraphs,
+  } as Data;
 };
 
 export default function useDatasetDependencies() {
diff --git a/airflow/www/static/js/datasets/Graph/index.tsx 
b/airflow/www/static/js/datasets/Graph/index.tsx
index 947be32959..f37f7a3487 100644
--- a/airflow/www/static/js/datasets/Graph/index.tsx
+++ b/airflow/www/static/js/datasets/Graph/index.tsx
@@ -41,7 +41,12 @@ const Graph = ({
   const { data, isLoading } = useDatasetDependencies();
 
   if (isLoading && !data) return <Spinner />;
-  if (!data) return null;
+  if (!data || !data.fullGraph || !data.subGraphs) return null;
+  const graph = selectedUri ? data.subGraphs.find((g) => g.children.some((n) 
=> n.id === `dataset:${selectedUri}`)) : data.fullGraph;
+  if (!graph) return null;
+  const {
+    edges, children, width: graphWidth, height: graphHeight,
+  } = graph;
 
   const initialTransform = {
     scaleX: 1,
@@ -53,10 +58,10 @@ const Graph = ({
   };
 
   const selectedEdges = selectedUri
-    ? data.edges?.filter(({ sources, targets }) => (
+    ? edges?.filter(({ sources, targets }) => (
       sources[0].includes(selectedUri) || targets[0].includes(selectedUri)))
     : [];
-  const highlightedNodes = data?.children
+  const highlightedNodes = children
     .filter((n) => (
       selectedEdges.some(({ sources, targets }) => (
         sources[0] === n.id || targets[0] === n.id))));
@@ -81,15 +86,15 @@ const Graph = ({
             style={{ cursor: zoom.isDragging ? 'grabbing' : 'grab', 
touchAction: 'none' }}
           >
             <g transform={zoom.toString()}>
-              <g height={data.height} width={data.width}>
-                {data.edges.map((edge) => (
+              <g height={graphHeight} width={graphWidth}>
+                {edges.map((edge) => (
                   <Edge
                     key={edge.id}
                     edge={edge}
                     isSelected={selectedEdges.some((e) => e.id === edge.id)}
                   />
                 ))}
-                {data.children.map((node) => (
+                {children.map((node) => (
                   <Node
                     key={node.id}
                     node={node}
@@ -105,8 +110,8 @@ const Graph = ({
                 <Legend
                   zoom={zoom}
                   center={() => zoom.translateTo({
-                    x: (width - (data.width ?? 0)) / 2,
-                    y: (height - (data.height ?? 0)) / 2,
+                    x: (width - (graphWidth ?? 0)) / 2,
+                    y: (height - (graphHeight ?? 0)) / 2,
                   })}
                 />
               </foreignObject>

Reply via email to