This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 99130ab3f91 AIP-38 Plug Graph on backend endpoint (#44572)
99130ab3f91 is described below
commit 99130ab3f917d57f0e7daf41ed3a56e6d87eaabd
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Wed Dec 4 23:02:52 2024 +0800
AIP-38 Plug Graph on backend endpoint (#44572)
* WIP functionnaly OK
* Fix CI
---
.../core_api/datamodels/ui/structure.py | 6 +-
.../api_fastapi/core_api/openapi/v1-generated.yaml | 11 +-
airflow/ui/openapi-gen/requests/schemas.gen.ts | 22 +--
airflow/ui/openapi-gen/requests/types.gen.ts | 8 +-
airflow/ui/src/pages/DagsList/Dag/DagVizModal.tsx | 10 +-
airflow/ui/src/pages/DagsList/Dag/Graph/Graph.tsx | 11 +-
.../ui/src/pages/DagsList/Dag/Graph/TaskName.tsx | 4 +-
airflow/ui/src/pages/DagsList/Dag/Graph/data.ts | 216 ---------------------
.../src/pages/DagsList/Dag/Graph/reactflowUtils.ts | 7 +-
.../src/pages/DagsList/Dag/Graph/useGraphLayout.ts | 47 +++--
airflow/ui/src/pages/DagsList/Dag/Tabs.tsx | 3 +-
airflow/utils/task_group.py | 2 +-
tests/utils/test_task_group.py | 6 +-
13 files changed, 69 insertions(+), 284 deletions(-)
diff --git a/airflow/api_fastapi/core_api/datamodels/ui/structure.py
b/airflow/api_fastapi/core_api/datamodels/ui/structure.py
index e3df958a1a8..cc234500eb7 100644
--- a/airflow/api_fastapi/core_api/datamodels/ui/structure.py
+++ b/airflow/api_fastapi/core_api/datamodels/ui/structure.py
@@ -34,12 +34,12 @@ class NodeResponse(BaseModel):
"""Node serializer for responses."""
children: list[NodeResponse] | None = None
- id: str | None
+ id: str
is_mapped: bool | None = None
- label: str | None = None
+ label: str
tooltip: str | None = None
setup_teardown_type: Literal["setup", "teardown"] | None = None
- type: Literal["join", "sensor", "task", "task_group"]
+ type: Literal["join", "sensor", "task", "asset_condition"]
class StructureDataResponse(BaseModel):
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index d6ea8dcec50..010a4141e06 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -7782,9 +7782,7 @@ components:
- type: 'null'
title: Children
id:
- anyOf:
- - type: string
- - type: 'null'
+ type: string
title: Id
is_mapped:
anyOf:
@@ -7792,9 +7790,7 @@ components:
- type: 'null'
title: Is Mapped
label:
- anyOf:
- - type: string
- - type: 'null'
+ type: string
title: Label
tooltip:
anyOf:
@@ -7815,11 +7811,12 @@ components:
- join
- sensor
- task
- - task_group
+ - asset_condition
title: Type
type: object
required:
- id
+ - label
- type
title: NodeResponse
description: Node serializer for responses.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 0f704159c8d..cb8e2e7396e 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -3128,14 +3128,7 @@ export const $NodeResponse = {
title: "Children",
},
id: {
- anyOf: [
- {
- type: "string",
- },
- {
- type: "null",
- },
- ],
+ type: "string",
title: "Id",
},
is_mapped: {
@@ -3150,14 +3143,7 @@ export const $NodeResponse = {
title: "Is Mapped",
},
label: {
- anyOf: [
- {
- type: "string",
- },
- {
- type: "null",
- },
- ],
+ type: "string",
title: "Label",
},
tooltip: {
@@ -3185,12 +3171,12 @@ export const $NodeResponse = {
},
type: {
type: "string",
- enum: ["join", "sensor", "task", "task_group"],
+ enum: ["join", "sensor", "task", "asset_condition"],
title: "Type",
},
},
type: "object",
- required: ["id", "type"],
+ required: ["id", "label", "type"],
title: "NodeResponse",
description: "Node serializer for responses.",
} as const;
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index e5aed24a98c..13321ad4424 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -761,15 +761,15 @@ export type JobResponse = {
*/
export type NodeResponse = {
children?: Array<NodeResponse> | null;
- id: string | null;
+ id: string;
is_mapped?: boolean | null;
- label?: string | null;
+ label: string;
tooltip?: string | null;
setup_teardown_type?: "setup" | "teardown" | null;
- type: "join" | "sensor" | "task" | "task_group";
+ type: "join" | "sensor" | "task" | "asset_condition";
};
-export type type = "join" | "sensor" | "task" | "task_group";
+export type type = "join" | "sensor" | "task" | "asset_condition";
/**
* Request body for Clear Task Instances endpoint.
diff --git a/airflow/ui/src/pages/DagsList/Dag/DagVizModal.tsx
b/airflow/ui/src/pages/DagsList/Dag/DagVizModal.tsx
index 726e97527cc..9ab868b8f8d 100644
--- a/airflow/ui/src/pages/DagsList/Dag/DagVizModal.tsx
+++ b/airflow/ui/src/pages/DagsList/Dag/DagVizModal.tsx
@@ -24,24 +24,28 @@ import { Dialog } from "src/components/ui";
import { Graph } from "./Graph";
type TriggerDAGModalProps = {
- dagDisplayName: DAGResponse["dag_display_name"];
+ dagDisplayName?: DAGResponse["dag_display_name"];
+ dagId?: DAGResponse["dag_id"];
onClose: () => void;
open: boolean;
};
export const DagVizModal: React.FC<TriggerDAGModalProps> = ({
dagDisplayName,
+ dagId,
onClose,
open,
}) => (
<Dialog.Root onOpenChange={onClose} open={open} size="full">
<Dialog.Content backdrop>
<Dialog.Header bg="blue.muted">
- <Heading size="xl">{dagDisplayName}</Heading>
+ <Heading size="xl">
+ {Boolean(dagDisplayName) ? dagDisplayName : "Dag Undefined"}
+ </Heading>
<Dialog.CloseTrigger closeButtonProps={{ size: "xl" }} />
</Dialog.Header>
<Dialog.Body display="flex">
- <Graph />
+ {dagId === undefined ? undefined : <Graph dagId={dagId} />}
</Dialog.Body>
</Dialog.Content>
</Dialog.Root>
diff --git a/airflow/ui/src/pages/DagsList/Dag/Graph/Graph.tsx
b/airflow/ui/src/pages/DagsList/Dag/Graph/Graph.tsx
index d80f0ba2f1a..84fbc2be114 100644
--- a/airflow/ui/src/pages/DagsList/Dag/Graph/Graph.tsx
+++ b/airflow/ui/src/pages/DagsList/Dag/Graph/Graph.tsx
@@ -20,13 +20,14 @@ import { Flex } from "@chakra-ui/react";
import { ReactFlow, Controls, Background, MiniMap } from "@xyflow/react";
import "@xyflow/react/dist/style.css";
+import { useStructureServiceStructureData } from "openapi/queries";
+import type { DAGResponse } from "openapi/requests/types.gen";
import { useColorMode } from "src/context/colorMode";
import { useOpenGroups } from "src/context/openGroups";
import Edge from "./Edge";
import { JoinNode } from "./JoinNode";
import { TaskNode } from "./TaskNode";
-import { graphData } from "./data";
import { useGraphLayout } from "./useGraphLayout";
const nodeTypes = {
@@ -35,10 +36,16 @@ const nodeTypes = {
};
const edgeTypes = { custom: Edge };
-export const Graph = () => {
+export const Graph = ({ dagId }: { readonly dagId: DAGResponse["dag_id"] }) =>
{
const { colorMode } = useColorMode();
const { openGroupIds } = useOpenGroups();
+
+ const { data: graphData = { arrange: "LR", edges: [], nodes: [] } } =
+ useStructureServiceStructureData({
+ dagId,
+ });
+
const { data } = useGraphLayout({
...graphData,
openGroupIds,
diff --git a/airflow/ui/src/pages/DagsList/Dag/Graph/TaskName.tsx
b/airflow/ui/src/pages/DagsList/Dag/Graph/TaskName.tsx
index 2747d8d1e82..6807225360c 100644
--- a/airflow/ui/src/pages/DagsList/Dag/Graph/TaskName.tsx
+++ b/airflow/ui/src/pages/DagsList/Dag/Graph/TaskName.tsx
@@ -20,7 +20,7 @@ import { Text, type TextProps } from "@chakra-ui/react";
import type { CSSProperties } from "react";
import { FiArrowUpRight, FiArrowDownRight } from "react-icons/fi";
-import type { Node } from "./data";
+import type { NodeResponse } from "openapi/requests/types.gen";
type Props = {
readonly id: string;
@@ -29,7 +29,7 @@ type Props = {
readonly isOpen?: boolean;
readonly isZoomedOut?: boolean;
readonly label: string;
- readonly setupTeardownType?: Node["setup_teardown_type"];
+ readonly setupTeardownType?: NodeResponse["setup_teardown_type"];
} & TextProps;
export const TaskName = ({
diff --git a/airflow/ui/src/pages/DagsList/Dag/Graph/data.ts
b/airflow/ui/src/pages/DagsList/Dag/Graph/data.ts
deleted file mode 100644
index 68759fc4ebf..00000000000
--- a/airflow/ui/src/pages/DagsList/Dag/Graph/data.ts
+++ /dev/null
@@ -1,216 +0,0 @@
-/*!
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-export type Edge = {
- is_setup_teardown?: boolean;
- label?: string;
- source_id: string;
- target_id: string;
-};
-
-export type Node = {
- children?: Array<Node>;
- id: string;
- is_mapped?: boolean;
- label: string;
- setup_teardown_type?: "setup" | "teardown";
- tooltip?: string;
- type:
- | "asset_alias"
- | "asset_condition"
- | "asset"
- | "dag"
- | "join"
- | "sensor"
- | "task"
- | "trigger";
-};
-
-export type GraphData = {
- arrange: "BT" | "LR" | "RL" | "TB";
- edges: Array<Edge>;
- nodes: Array<Node>;
-};
-
-export const graphData: GraphData = {
- arrange: "LR",
- edges: [
- {
- source_id: "section_1.upstream_join_id",
- target_id: "section_1.taskgroup_setup",
- },
- {
- source_id: "section_1.downstream_join_id",
- target_id: "section_2.upstream_join_id",
- },
- {
- source_id: "section_1.normal",
- target_id: "section_1.taskgroup_teardown",
- },
- {
- is_setup_teardown: true,
- label: "setup and teardown",
- source_id: "section_1.taskgroup_setup",
- target_id: "section_1.taskgroup_teardown",
- },
- {
- label: "test",
- source_id: "section_1.taskgroup_teardown",
- target_id: "section_1.downstream_join_id",
- },
- {
- source_id: "section_1.taskgroup_setup",
- target_id: "section_1.normal",
- },
- {
- source_id: "section_2.downstream_join_id",
- target_id: "end",
- },
- {
- source_id: "section_2.inner_section_2.task_2",
- target_id: "section_2.inner_section_2.task_4",
- },
- {
- source_id: "section_2.inner_section_2.task_3",
- target_id: "section_2.inner_section_2.task_4",
- },
- {
- source_id: "section_2.inner_section_2.task_4",
- target_id: "section_2.downstream_join_id",
- },
- {
- source_id: "section_2.task_1",
- target_id: "section_2.downstream_join_id",
- },
- {
- source_id: "section_2.upstream_join_id",
- target_id: "section_2.inner_section_2.task_2",
- },
- {
- source_id: "section_2.upstream_join_id",
- target_id: "section_2.inner_section_2.task_3",
- },
- {
- source_id: "section_2.upstream_join_id",
- target_id: "section_2.task_1",
- },
- {
- label: "I am a realllllllllllllllllly long label",
- source_id: "start",
- target_id: "section_1.upstream_join_id",
- },
- ],
- nodes: [
- {
- id: "end",
- label: "end",
- type: "task",
- },
- {
- children: [
- {
- id: "section_1.normal",
- label: "normal",
- type: "task",
- },
- {
- id: "section_1.taskgroup_setup",
- label: "taskgroup_setup",
- setup_teardown_type: "setup",
- type: "task",
- },
- {
- id: "section_1.taskgroup_teardown",
- label: "taskgroup_teardown",
- setup_teardown_type: "teardown",
- type: "task",
- },
- {
- id: "section_1.upstream_join_id",
- label: "",
- type: "join",
- },
- {
- id: "section_1.downstream_join_id",
- label: "",
- type: "join",
- },
- ],
- id: "section_1",
- is_mapped: false,
- label: "section_1",
- tooltip: "Tasks for section_1",
- type: "task",
- },
- {
- children: [
- {
- children: [
- {
- id: "section_2.inner_section_2.task_2",
- label: "task_2",
- type: "task",
- },
- {
- id: "section_2.inner_section_2.task_3",
- is_mapped: true,
- label: "task_3",
- type: "task",
- },
- {
- id: "section_2.inner_section_2.task_4",
- label: "task_4",
- type: "task",
- },
- ],
- id: "section_2.inner_section_2",
- label: "inner_section_2",
- tooltip: "Tasks for inner_section2",
- type: "task",
- },
- {
- id: "section_2.task_1",
- is_mapped: true,
- label: "task_1",
- type: "task",
- },
- {
- id: "section_2.upstream_join_id",
- label: "",
- type: "join",
- },
- {
- id: "section_2.downstream_join_id",
- label: "",
- type: "join",
- },
- ],
- id: "section_2",
- is_mapped: false,
- label: "section_2",
- tooltip: "Tasks for section_2",
- type: "task",
- },
- {
- id: "start",
- label: "start",
- type: "task",
- },
- ],
-};
diff --git a/airflow/ui/src/pages/DagsList/Dag/Graph/reactflowUtils.ts
b/airflow/ui/src/pages/DagsList/Dag/Graph/reactflowUtils.ts
index 76eb4c7489f..793eb840c5d 100644
--- a/airflow/ui/src/pages/DagsList/Dag/Graph/reactflowUtils.ts
+++ b/airflow/ui/src/pages/DagsList/Dag/Graph/reactflowUtils.ts
@@ -19,7 +19,8 @@
import type { Node as FlowNodeType, Edge as FlowEdgeType } from
"@xyflow/react";
import type { ElkExtendedEdge } from "elkjs";
-import type { Node } from "./data";
+import type { NodeResponse } from "openapi/requests/types.gen";
+
import type { LayoutNode } from "./useGraphLayout";
export type CustomNodeProps = {
@@ -30,7 +31,7 @@ export type CustomNodeProps = {
isMapped?: boolean;
isOpen?: boolean;
label: string;
- setupTeardownType?: Node["setup_teardown_type"];
+ setupTeardownType?: NodeResponse["setup_teardown_type"];
width?: number;
};
@@ -103,7 +104,7 @@ export const flattenGraph = ({
if (node.children) {
const { edges: childEdges, nodes: childNodes } = flattenGraph({
- children: node.children,
+ children: node.children as Array<LayoutNode>,
parent: newNode,
});
diff --git a/airflow/ui/src/pages/DagsList/Dag/Graph/useGraphLayout.ts
b/airflow/ui/src/pages/DagsList/Dag/Graph/useGraphLayout.ts
index cd0d1089078..352c9ada008 100644
--- a/airflow/ui/src/pages/DagsList/Dag/Graph/useGraphLayout.ts
+++ b/airflow/ui/src/pages/DagsList/Dag/Graph/useGraphLayout.ts
@@ -19,7 +19,12 @@
import { useQuery } from "@tanstack/react-query";
import ELK, { type ElkNode, type ElkExtendedEdge, type ElkShape } from "elkjs";
-import type { Edge, Node } from "./data";
+import type {
+ EdgeResponse,
+ NodeResponse,
+ StructureDataResponse,
+} from "openapi/requests/types.gen";
+
import { flattenGraph, formatFlowEdges } from "./reactflowUtils";
type EdgeLabel = {
@@ -35,9 +40,9 @@ type FormattedNode = {
isGroup: boolean;
isMapped?: boolean;
isOpen?: boolean;
- setupTeardownType?: Node["setup_teardown_type"];
+ setupTeardownType?: NodeResponse["setup_teardown_type"];
} & ElkShape &
- Node;
+ NodeResponse;
type FormattedEdge = {
id: string;
@@ -46,7 +51,7 @@ type FormattedEdge = {
parentNode?: string;
} & ElkExtendedEdge;
-export type LayoutNode = ElkNode & Node;
+export type LayoutNode = ElkNode & NodeResponse;
// Take text and font to calculate how long each node should be
const getTextWidth = (text: string, font: string) => {
@@ -76,15 +81,16 @@ const getDirection = (arrange: string) => {
};
const formatElkEdge = (
- edge: Edge,
+ edge: EdgeResponse,
font: string,
- node?: Node,
+ node?: NodeResponse,
): FormattedEdge => ({
id: `${edge.source_id}-${edge.target_id}`,
- isSetupTeardown: edge.is_setup_teardown,
+ isSetupTeardown:
+ edge.is_setup_teardown === null ? undefined : edge.is_setup_teardown,
// isSourceAsset: e.isSourceAsset,
labels:
- edge.label === undefined
+ edge.label === undefined || edge.label === null
? []
: [
{
@@ -99,7 +105,7 @@ const formatElkEdge = (
targets: [edge.target_id],
});
-const getNestedChildIds = (children: Array<Node>) => {
+const getNestedChildIds = (children: Array<NodeResponse>) => {
let childIds: Array<string> = [];
children.forEach((child) => {
@@ -116,9 +122,9 @@ const getNestedChildIds = (children: Array<Node>) => {
type GenerateElkProps = {
arrange: string;
- edges: Array<Edge>;
+ edges: Array<EdgeResponse>;
font: string;
- nodes: Array<Node>;
+ nodes: Array<NodeResponse>;
openGroupIds?: Array<string>;
};
@@ -132,15 +138,17 @@ const generateElkGraph = ({
const closedGroupIds: Array<string> = [];
let filteredEdges = unformattedEdges;
- const formatChildNode = (node: Node): FormattedNode => {
+ const formatChildNode = (node: NodeResponse): FormattedNode => {
const isOpen = openGroupIds?.includes(node.id);
const childCount =
node.children?.filter((child) => child.type !== "join").length ?? 0;
const childIds =
- node.children === undefined ? [] : getNestedChildIds(node.children);
+ node.children === null || node.children === undefined
+ ? []
+ : getNestedChildIds(node.children);
- if (isOpen && node.children !== undefined) {
+ if (isOpen && node.children !== null && node.children !== undefined) {
return {
...node,
childCount,
@@ -212,7 +220,7 @@ const generateElkGraph = ({
height,
id: node.id,
isGroup: Boolean(node.children),
- isMapped: node.is_mapped,
+ isMapped: node.is_mapped === null ? undefined : node.is_mapped,
label: node.label,
setupTeardownType: node.setup_teardown_type,
type: node.type,
@@ -225,7 +233,7 @@ const generateElkGraph = ({
const edges = filteredEdges.map((fe) => formatElkEdge(fe, font));
return {
- children,
+ children: children as Array<ElkNode>,
edges,
id: "root",
layoutOptions: {
@@ -238,11 +246,8 @@ const generateElkGraph = ({
};
type LayoutProps = {
- arrange?: string;
- edges: Array<Edge>;
- nodes: Array<Node>;
openGroupIds: Array<string>;
-};
+} & StructureDataResponse;
export const useGraphLayout = ({
arrange = "LR",
@@ -271,7 +276,7 @@ export const useGraphLayout = ({
// 3. Flatten the nodes and edges for xyflow to actually render the graph
const flattenedData = flattenGraph({
- children: data.children,
+ children: (data.children ?? []) as Array<LayoutNode>,
});
// merge & dedupe edges
diff --git a/airflow/ui/src/pages/DagsList/Dag/Tabs.tsx
b/airflow/ui/src/pages/DagsList/Dag/Tabs.tsx
index e3447051dbf..5c2d44109ad 100644
--- a/airflow/ui/src/pages/DagsList/Dag/Tabs.tsx
+++ b/airflow/ui/src/pages/DagsList/Dag/Tabs.tsx
@@ -81,7 +81,8 @@ export const DagTabs = ({ dag }: { readonly dag?: DAGResponse
}) => {
</Flex>
</Flex>
<DagVizModal
- dagDisplayName={dag?.dag_display_name ?? "graph"}
+ dagDisplayName={dag?.dag_display_name}
+ dagId={dag?.dag_id}
onClose={onClose}
open={isGraphOpen}
/>
diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py
index 3d3738f5a8d..2f850826ca9 100644
--- a/airflow/utils/task_group.py
+++ b/airflow/utils/task_group.py
@@ -123,7 +123,7 @@ def task_group_to_dict(task_item_or_group):
"tooltip": task_group.tooltip,
"is_mapped": is_mapped,
"children": children,
- "type": "task_group",
+ "type": "task",
}
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index 7b5c960f9c2..bd336fa4c94 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -190,17 +190,17 @@ EXPECTED_JSON = {
{"id": "group234.group34.task4", "label": "task4",
"type": "task"},
{"id": "group234.group34.downstream_join_id", "label":
"", "type": "join"},
],
- "type": "task_group",
+ "type": "task",
},
{"id": "group234.task2", "label": "task2", "type": "task"},
{"id": "group234.upstream_join_id", "label": "", "type":
"join"},
],
- "type": "task_group",
+ "type": "task",
},
{"id": "task1", "label": "task1", "type": "task"},
{"id": "task5", "label": "task5", "type": "task"},
],
- "type": "task_group",
+ "type": "task",
}