This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e84c032cec Separate dataset deps into individual graphs (#27356)
e84c032cec is described below
commit e84c032cec911fd738ebb0b93a38994a10e3ca27
Author: Brent Bovenzi <[email protected]>
AuthorDate: Wed Nov 9 08:41:41 2022 -0600
Separate dataset deps into individual graphs (#27356)
* separate dataset deps graphs
* fix graph merging, include full graph
* fix edge filtering/merging
* add comments
* fix graph component
* Update airflow/www/static/js/api/useDatasetDependencies.ts
Co-authored-by: Pierre Jeambrun <[email protected]>
* Update airflow/www/static/js/api/useDatasetDependencies.ts
Co-authored-by: Pierre Jeambrun <[email protected]>
Co-authored-by: Pierre Jeambrun <[email protected]>
---
.../www/static/js/api/useDatasetDependencies.ts | 128 ++++++++++++++++++++-
airflow/www/static/js/datasets/Graph/index.tsx | 21 ++--
2 files changed, 138 insertions(+), 11 deletions(-)
diff --git a/airflow/www/static/js/api/useDatasetDependencies.ts
b/airflow/www/static/js/api/useDatasetDependencies.ts
index d7ec3260e6..bd61ee9087 100644
--- a/airflow/www/static/js/api/useDatasetDependencies.ts
+++ b/airflow/www/static/js/api/useDatasetDependencies.ts
@@ -24,6 +24,7 @@ import ELK, { ElkShape, ElkExtendedEdge } from 'elkjs';
import { getMetaValue } from 'src/utils';
import type { DepEdge, DepNode } from 'src/types';
import type { NodeType } from 'src/datasets/Graph/Node';
+import { unionBy } from 'lodash';
interface DatasetDependencies {
edges: DepEdge[];
@@ -36,11 +37,16 @@ interface GenerateProps {
font: string;
}
-interface Data extends ElkShape {
+interface Graph extends ElkShape {
children: NodeType[];
edges: ElkExtendedEdge[];
}
+interface Data {
+ fullGraph: Graph;
+ subGraphs: Graph[];
+}
+
// Take text and font to calculate how long each node should be
function getTextWidth(text: string, font: string) {
const context = document.createElement('canvas').getContext('2d');
@@ -59,6 +65,7 @@ const generateGraph = ({ nodes, edges, font }: GenerateProps)
=> ({
'spacing.edgeNodeBetweenLayers': '10.0',
'layering.strategy': 'INTERACTIVE',
algorithm: 'layered',
+ 'crossingMinimization.semiInteractive': 'true',
'spacing.edgeEdgeBetweenLayers': '10.0',
'spacing.edgeNode': '10.0',
'spacing.edgeEdge': '10.0',
@@ -75,15 +82,130 @@ const generateGraph = ({ nodes, edges, font }:
GenerateProps) => ({
edges: edges.map((e) => ({ id: `${e.source}-${e.target}`, sources:
[e.source], targets: [e.target] })),
});
+interface SeparateGraphsProps extends DatasetDependencies {
+ graphs: DatasetDependencies[];
+}
+
+const graphIndicesToMerge: Record<number, number[]> = {};
+const indicesToRemove: number[] = [];
+
+// find the downstream graph of each upstream edge
+const findDownstreamGraph = (
+ { edges, nodes, graphs = [] }: SeparateGraphsProps,
+): DatasetDependencies[] => {
+ const newGraphs = [...graphs];
+ let filteredEdges = [...edges];
+
+ graphs.forEach((g, i) => {
+ // find downstream edges
+ const downstreamEdges = edges.filter((e) => g.edges.some((ge) => ge.target
=== e.source));
+ const downstreamNodes: DepNode[] = [];
+
+ downstreamEdges.forEach((e) => {
+ const newNode = nodes.find((n) => n.id === e.target);
+ if (newNode) {
+ downstreamNodes.push(newNode);
+
+ // check if the node already exists in a different graph
+ const existingGraphIndex = newGraphs
+ .findIndex(((ng) => ng.nodes.some((n) => n.id === newNode.id)));
+
+ // mark if the graph needs to merge with another
+ if (existingGraphIndex > -1) {
+ indicesToRemove.push(existingGraphIndex);
+ graphIndicesToMerge[i] = [...(graphIndicesToMerge[i] || []),
existingGraphIndex];
+ }
+
+ // add node and edge to the graph
+ newGraphs[i] = {
+ nodes: [...newGraphs[i].nodes, newNode],
+ edges: [...newGraphs[i].edges, e],
+ };
+
+ // remove edge from edge list
+ filteredEdges = filteredEdges
+ .filter((fe) => !(fe.source === e.source && fe.target === e.target));
+ }
+ });
+ });
+
+ // once there are no more filtered edges left, merge relevant graphs
+ // we merge afterwards to make sure we captured all nodes + edges
+ if (!filteredEdges.length) {
+ Object.keys(graphIndicesToMerge).forEach((key) => {
+ const realKey = key as unknown as number;
+ const values = graphIndicesToMerge[realKey];
+ values.forEach((v) => {
+ newGraphs[realKey] = {
+ nodes: unionBy(newGraphs[realKey].nodes, newGraphs[v].nodes, 'id'),
+ edges: [...newGraphs[realKey].edges, ...newGraphs[v].edges]
+ .filter((e, i, s) => (
+ i === s.findIndex((t) => t.source === e.source && t.target ===
e.target)
+ )),
+ };
+ });
+ });
+ return newGraphs.filter((g, i) => !indicesToRemove.some((j) => i === j));
+ }
+
+ return findDownstreamGraph({ edges: filteredEdges, nodes, graphs: newGraphs
});
+};
+
+// separate the list of nodes/edges into distinct dataset pipeline graphs
+const separateGraphs = ({ edges, nodes }: DatasetDependencies):
DatasetDependencies[] => {
+ const separatedGraphs: DatasetDependencies[] = [];
+ let remainingEdges = [...edges];
+ let remainingNodes = [...nodes];
+
+ edges.forEach((edge) => {
+ const isDownstream = edges.some((e) => e.target === edge.source);
+
+ // if the edge is not downstream of anything, then start building the graph
+ if (!isDownstream) {
+ const connectedNodes = nodes.filter((n) => n.id === edge.source || n.id
=== edge.target);
+
+ // check if one of the nodes is already connected to a separated graph
+ const nodesInUse = separatedGraphs
+ .findIndex((g) => g.nodes.some((n) => connectedNodes.some((nn) =>
nn.id === n.id)));
+
+ if (nodesInUse > -1) {
+ // if one of the nodes is already in use, merge the graphs
+ const { nodes: existingNodes, edges: existingEdges } =
separatedGraphs[nodesInUse];
+ separatedGraphs[nodesInUse] = { nodes: unionBy(existingNodes,
connectedNodes, 'id'), edges: [...existingEdges, edge] };
+ } else {
+ // else just add the new separated graph
+ separatedGraphs.push({ nodes: connectedNodes, edges: [edge] });
+ }
+
+ // filter out used nodes and edges
+ remainingEdges = remainingEdges.filter((e) => e.source !== edge.source);
+ remainingNodes = remainingNodes.filter((n) => !connectedNodes.some((nn)
=> nn.id === n.id));
+ }
+ });
+
+ if (remainingEdges.length) {
+ return findDownstreamGraph({ edges: remainingEdges, nodes, graphs:
separatedGraphs });
+ }
+ return separatedGraphs;
+};
+
const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => {
const elk = new ELK();
+ const graphs = separateGraphs({ edges, nodes });
+
// get computed style to calculate how large each node should be
const font = `bold ${16}px
${window.getComputedStyle(document.body).fontFamily}`;
// Finally generate the graph data with elk
- const data = await elk.layout(generateGraph({ nodes, edges, font }));
- return data as Data;
+ const subGraphs = await Promise.all(graphs.map(async (g) => (
+ elk.layout(generateGraph({ nodes: g.nodes, edges: g.edges, font }))
+ )));
+ const fullGraph = await elk.layout(generateGraph({ nodes, edges, font }));
+ return {
+ fullGraph,
+ subGraphs,
+ } as Data;
};
export default function useDatasetDependencies() {
diff --git a/airflow/www/static/js/datasets/Graph/index.tsx
b/airflow/www/static/js/datasets/Graph/index.tsx
index 947be32959..f37f7a3487 100644
--- a/airflow/www/static/js/datasets/Graph/index.tsx
+++ b/airflow/www/static/js/datasets/Graph/index.tsx
@@ -41,7 +41,12 @@ const Graph = ({
const { data, isLoading } = useDatasetDependencies();
if (isLoading && !data) return <Spinner />;
- if (!data) return null;
+ if (!data || !data.fullGraph || !data.subGraphs) return null;
+ const graph = selectedUri ? data.subGraphs.find((g) => g.children.some((n)
=> n.id === `dataset:${selectedUri}`)) : data.fullGraph;
+ if (!graph) return null;
+ const {
+ edges, children, width: graphWidth, height: graphHeight,
+ } = graph;
const initialTransform = {
scaleX: 1,
@@ -53,10 +58,10 @@ const Graph = ({
};
const selectedEdges = selectedUri
- ? data.edges?.filter(({ sources, targets }) => (
+ ? edges?.filter(({ sources, targets }) => (
sources[0].includes(selectedUri) || targets[0].includes(selectedUri)))
: [];
- const highlightedNodes = data?.children
+ const highlightedNodes = children
.filter((n) => (
selectedEdges.some(({ sources, targets }) => (
sources[0] === n.id || targets[0] === n.id))));
@@ -81,15 +86,15 @@ const Graph = ({
style={{ cursor: zoom.isDragging ? 'grabbing' : 'grab',
touchAction: 'none' }}
>
<g transform={zoom.toString()}>
- <g height={data.height} width={data.width}>
- {data.edges.map((edge) => (
+ <g height={graphHeight} width={graphWidth}>
+ {edges.map((edge) => (
<Edge
key={edge.id}
edge={edge}
isSelected={selectedEdges.some((e) => e.id === edge.id)}
/>
))}
- {data.children.map((node) => (
+ {children.map((node) => (
<Node
key={node.id}
node={node}
@@ -105,8 +110,8 @@ const Graph = ({
<Legend
zoom={zoom}
center={() => zoom.translateTo({
- x: (width - (data.width ?? 0)) / 2,
- y: (height - (data.height ?? 0)) / 2,
+ x: (width - (graphWidth ?? 0)) / 2,
+ y: (height - (graphHeight ?? 0)) / 2,
})}
/>
</foreignObject>