This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c0f282643e4 AIP-84 Graph Data update datamodel (#44459)
c0f282643e4 is described below

commit c0f282643e4cd1c4f55f1c505f6407c50bf658dd
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Mon Dec 2 23:27:27 2024 +0800

    AIP-84 Graph Data update datamodel (#44459)
    
    * AIP-84 Update Structure Datamodel
    
    * Update task sdk task_group_to_dict
---
 .../datamodels/ui/{graph.py => structure.py}       |  26 +--
 .../api_fastapi/core_api/openapi/v1-generated.yaml | 118 +++++-------
 airflow/api_fastapi/core_api/routes/ui/__init__.py |   4 +-
 .../core_api/routes/ui/{graph.py => structure.py}  |  20 ++-
 airflow/ui/openapi-gen/queries/common.ts           |  17 +-
 airflow/ui/openapi-gen/queries/prefetch.ts         |  14 +-
 airflow/ui/openapi-gen/queries/queries.ts          |  16 +-
 airflow/ui/openapi-gen/queries/suspense.ts         |  16 +-
 airflow/ui/openapi-gen/requests/schemas.gen.ts     | 124 +++++--------
 airflow/ui/openapi-gen/requests/services.gen.ts    |  20 +--
 airflow/ui/openapi-gen/requests/types.gen.ts       |  51 +++---
 airflow/utils/task_group.py                        |  57 +++++-
 airflow/www/views.py                               |   4 +-
 task_sdk/src/airflow/sdk/definitions/taskgroup.py  |  85 ++++-----
 tests/api_fastapi/core_api/routes/ui/test_graph.py | 198 ---------------------
 .../core_api/routes/ui/test_structure.py           | 134 ++++++++++++++
 tests/utils/test_task_group.py                     |  96 ++++++++--
 17 files changed, 485 insertions(+), 515 deletions(-)

diff --git a/airflow/api_fastapi/core_api/datamodels/ui/graph.py 
b/airflow/api_fastapi/core_api/datamodels/ui/structure.py
similarity index 75%
rename from airflow/api_fastapi/core_api/datamodels/ui/graph.py
rename to airflow/api_fastapi/core_api/datamodels/ui/structure.py
index b4d7587b35e..e3df958a1a8 100644
--- a/airflow/api_fastapi/core_api/datamodels/ui/graph.py
+++ b/airflow/api_fastapi/core_api/datamodels/ui/structure.py
@@ -30,31 +30,21 @@ class EdgeResponse(BaseModel):
     target_id: str
 
 
-class NodeValueResponse(BaseModel):
-    """Graph Node Value responses."""
-
-    isMapped: bool | None = None
-    label: str | None = None
-    labelStyle: str | None = None
-    style: str | None = None
-    tooltip: str | None = None
-    rx: int
-    ry: int
-    clusterLabelPos: str | None = None
-    setupTeardownType: Literal["setup", "teardown"] | None = None
-
-
 class NodeResponse(BaseModel):
     """Node serializer for responses."""
 
     children: list[NodeResponse] | None = None
     id: str | None
-    value: NodeValueResponse
+    is_mapped: bool | None = None
+    label: str | None = None
+    tooltip: str | None = None
+    setup_teardown_type: Literal["setup", "teardown"] | None = None
+    type: Literal["join", "sensor", "task", "task_group"]
 
 
-class GraphDataResponse(BaseModel):
-    """Graph Data serializer for responses."""
+class StructureDataResponse(BaseModel):
+    """Structure Data serializer for responses."""
 
     edges: list[EdgeResponse]
-    nodes: NodeResponse
+    nodes: list[NodeResponse]
     arrange: Literal["BT", "LR", "RL", "TB"]
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 20c450cf0a2..87a63a3c479 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -194,13 +194,13 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
-  /ui/graph/graph_data:
+  /ui/structure/structure_data:
     get:
       tags:
-      - Graph
-      summary: Graph Data
-      description: Get Graph Data.
-      operationId: graph_data
+      - Structure
+      summary: Structure Data
+      description: Get Structure Data.
+      operationId: structure_data
       parameters:
       - name: dag_id
         in: query
@@ -236,7 +236,7 @@ paths:
           content:
             application/json:
               schema:
-                $ref: '#/components/schemas/GraphDataResponse'
+                $ref: '#/components/schemas/StructureDataResponse'
         '400':
           content:
             application/json:
@@ -7594,30 +7594,6 @@ components:
       - name
       title: FastAPIAppResponse
       description: Serializer for Plugin FastAPI App responses.
-    GraphDataResponse:
-      properties:
-        edges:
-          items:
-            $ref: '#/components/schemas/EdgeResponse'
-          type: array
-          title: Edges
-        nodes:
-          $ref: '#/components/schemas/NodeResponse'
-        arrange:
-          type: string
-          enum:
-          - BT
-          - LR
-          - RL
-          - TB
-          title: Arrange
-      type: object
-      required:
-      - edges
-      - nodes
-      - arrange
-      title: GraphDataResponse
-      description: Graph Data serializer for responses.
     HTTPExceptionResponse:
       properties:
         detail:
@@ -7810,66 +7786,43 @@ components:
           - type: string
           - type: 'null'
           title: Id
-        value:
-          $ref: '#/components/schemas/NodeValueResponse'
-      type: object
-      required:
-      - id
-      - value
-      title: NodeResponse
-      description: Node serializer for responses.
-    NodeValueResponse:
-      properties:
-        isMapped:
+        is_mapped:
           anyOf:
           - type: boolean
           - type: 'null'
-          title: Ismapped
+          title: Is Mapped
         label:
           anyOf:
           - type: string
           - type: 'null'
           title: Label
-        labelStyle:
-          anyOf:
-          - type: string
-          - type: 'null'
-          title: Labelstyle
-        style:
-          anyOf:
-          - type: string
-          - type: 'null'
-          title: Style
         tooltip:
           anyOf:
           - type: string
           - type: 'null'
           title: Tooltip
-        rx:
-          type: integer
-          title: Rx
-        ry:
-          type: integer
-          title: Ry
-        clusterLabelPos:
-          anyOf:
-          - type: string
-          - type: 'null'
-          title: Clusterlabelpos
-        setupTeardownType:
+        setup_teardown_type:
           anyOf:
           - type: string
             enum:
             - setup
             - teardown
           - type: 'null'
-          title: Setupteardowntype
+          title: Setup Teardown Type
+        type:
+          type: string
+          enum:
+          - join
+          - sensor
+          - task
+          - task_group
+          title: Type
       type: object
       required:
-      - rx
-      - ry
-      title: NodeValueResponse
-      description: Graph Node Value responses.
+      - id
+      - type
+      title: NodeResponse
+      description: Node serializer for responses.
     PatchTaskInstanceBody:
       properties:
         dry_run:
@@ -8219,6 +8172,33 @@ components:
       - latest_scheduler_heartbeat
       title: SchedulerInfoResponse
       description: Scheduler info serializer for responses.
+    StructureDataResponse:
+      properties:
+        edges:
+          items:
+            $ref: '#/components/schemas/EdgeResponse'
+          type: array
+          title: Edges
+        nodes:
+          items:
+            $ref: '#/components/schemas/NodeResponse'
+          type: array
+          title: Nodes
+        arrange:
+          type: string
+          enum:
+          - BT
+          - LR
+          - RL
+          - TB
+          title: Arrange
+      type: object
+      required:
+      - edges
+      - nodes
+      - arrange
+      title: StructureDataResponse
+      description: Structure Data serializer for responses.
     TaskCollectionResponse:
       properties:
         tasks:
diff --git a/airflow/api_fastapi/core_api/routes/ui/__init__.py 
b/airflow/api_fastapi/core_api/routes/ui/__init__.py
index 0fa150b4653..2b22cc54120 100644
--- a/airflow/api_fastapi/core_api/routes/ui/__init__.py
+++ b/airflow/api_fastapi/core_api/routes/ui/__init__.py
@@ -21,7 +21,7 @@ from airflow.api_fastapi.core_api.routes.ui.assets import 
assets_router
 from airflow.api_fastapi.core_api.routes.ui.config import config_router
 from airflow.api_fastapi.core_api.routes.ui.dags import dags_router
 from airflow.api_fastapi.core_api.routes.ui.dashboard import dashboard_router
-from airflow.api_fastapi.core_api.routes.ui.graph import graph_data_router
+from airflow.api_fastapi.core_api.routes.ui.structure import structure_router
 
 ui_router = AirflowRouter(prefix="/ui")
 
@@ -29,4 +29,4 @@ ui_router.include_router(assets_router)
 ui_router.include_router(config_router)
 ui_router.include_router(dags_router)
 ui_router.include_router(dashboard_router)
-ui_router.include_router(graph_data_router)
+ui_router.include_router(structure_router)
diff --git a/airflow/api_fastapi/core_api/routes/ui/graph.py 
b/airflow/api_fastapi/core_api/routes/ui/structure.py
similarity index 79%
rename from airflow/api_fastapi/core_api/routes/ui/graph.py
rename to airflow/api_fastapi/core_api/routes/ui/structure.py
index 10dbfb150c7..a6ed936b92d 100644
--- a/airflow/api_fastapi/core_api/routes/ui/graph.py
+++ b/airflow/api_fastapi/core_api/routes/ui/structure.py
@@ -20,35 +20,37 @@ from fastapi import Request, status
 
 from airflow.api_fastapi.common.db.common import SessionDep
 from airflow.api_fastapi.common.router import AirflowRouter
-from airflow.api_fastapi.core_api.datamodels.ui.graph import GraphDataResponse
+from airflow.api_fastapi.core_api.datamodels.ui.structure import 
StructureDataResponse
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
 from airflow.utils.dag_edges import dag_edges
 from airflow.utils.task_group import task_group_to_dict
 
-graph_data_router = AirflowRouter(tags=["Graph"], prefix="/graph")
+structure_router = AirflowRouter(tags=["Structure"], prefix="/structure")
 
 
-@graph_data_router.get(
-    "/graph_data",
+@structure_router.get(
+    "/structure_data",
     include_in_schema=False,
     responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST]),
 )
-def graph_data(
+def structure_data(
     session: SessionDep,
     dag_id: str,
     request: Request,
     root: str | None = None,
     include_upstream: bool = False,
     include_downstream: bool = False,
-) -> GraphDataResponse:
-    """Get Graph Data."""
+) -> StructureDataResponse:
+    """Get Structure Data."""
     dag = request.app.state.dag_bag.get_dag(dag_id)
     if root:
         dag = dag.partial_subset(
             task_ids_or_regex=root, include_upstream=include_upstream, 
include_downstream=include_downstream
         )
 
-    nodes = task_group_to_dict(dag.task_group)
+    nodes = [
+        task_group_to_dict(child) for child in 
sorted(dag.task_group.children.values(), key=lambda t: t.label)
+    ]
     edges = dag_edges(dag)
 
     data = {
@@ -57,4 +59,4 @@ def graph_data(
         "edges": edges,
     }
 
-    return GraphDataResponse(**data)
+    return StructureDataResponse(**data)
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index f7e3576019e..766c08ae909 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -16,13 +16,13 @@ import {
   DashboardService,
   EventLogService,
   ExtraLinksService,
-  GraphService,
   ImportErrorService,
   JobService,
   MonitorService,
   PluginService,
   PoolService,
   ProviderService,
+  StructureService,
   TaskInstanceService,
   TaskService,
   VariableService,
@@ -327,15 +327,16 @@ export const UseDashboardServiceHistoricalMetricsKeyFn = (
   useDashboardServiceHistoricalMetricsKey,
   ...(queryKey ?? [{ endDate, startDate }]),
 ];
-export type GraphServiceGraphDataDefaultResponse = Awaited<
-  ReturnType<typeof GraphService.graphData>
+export type StructureServiceStructureDataDefaultResponse = Awaited<
+  ReturnType<typeof StructureService.structureData>
 >;
-export type GraphServiceGraphDataQueryResult<
-  TData = GraphServiceGraphDataDefaultResponse,
+export type StructureServiceStructureDataQueryResult<
+  TData = StructureServiceStructureDataDefaultResponse,
   TError = unknown,
 > = UseQueryResult<TData, TError>;
-export const useGraphServiceGraphDataKey = "GraphServiceGraphData";
-export const UseGraphServiceGraphDataKeyFn = (
+export const useStructureServiceStructureDataKey =
+  "StructureServiceStructureData";
+export const UseStructureServiceStructureDataKeyFn = (
   {
     dagId,
     includeDownstream,
@@ -349,7 +350,7 @@ export const UseGraphServiceGraphDataKeyFn = (
   },
   queryKey?: Array<unknown>,
 ) => [
-  useGraphServiceGraphDataKey,
+  useStructureServiceStructureDataKey,
   ...(queryKey ?? [{ dagId, includeDownstream, includeUpstream, root }]),
 ];
 export type BackfillServiceListBackfillsDefaultResponse = Awaited<
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 54603f0b160..2bf0cf3695c 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -15,13 +15,13 @@ import {
   DashboardService,
   EventLogService,
   ExtraLinksService,
-  GraphService,
   ImportErrorService,
   JobService,
   MonitorService,
   PluginService,
   PoolService,
   ProviderService,
+  StructureService,
   TaskInstanceService,
   TaskService,
   VariableService,
@@ -407,17 +407,17 @@ export const prefetchUseDashboardServiceHistoricalMetrics 
= (
     queryFn: () => DashboardService.historicalMetrics({ endDate, startDate }),
   });
 /**
- * Graph Data
- * Get Graph Data.
+ * Structure Data
+ * Get Structure Data.
  * @param data The data for the request.
  * @param data.dagId
  * @param data.root
  * @param data.includeUpstream
  * @param data.includeDownstream
- * @returns GraphDataResponse Successful Response
+ * @returns StructureDataResponse Successful Response
  * @throws ApiError
  */
-export const prefetchUseGraphServiceGraphData = (
+export const prefetchUseStructureServiceStructureData = (
   queryClient: QueryClient,
   {
     dagId,
@@ -432,14 +432,14 @@ export const prefetchUseGraphServiceGraphData = (
   },
 ) =>
   queryClient.prefetchQuery({
-    queryKey: Common.UseGraphServiceGraphDataKeyFn({
+    queryKey: Common.UseStructureServiceStructureDataKeyFn({
       dagId,
       includeDownstream,
       includeUpstream,
       root,
     }),
     queryFn: () =>
-      GraphService.graphData({
+      StructureService.structureData({
         dagId,
         includeDownstream,
         includeUpstream,
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index 3992c3a2edf..6b9645fcb5e 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -21,13 +21,13 @@ import {
   DashboardService,
   EventLogService,
   ExtraLinksService,
-  GraphService,
   ImportErrorService,
   JobService,
   MonitorService,
   PluginService,
   PoolService,
   ProviderService,
+  StructureService,
   TaskInstanceService,
   TaskService,
   VariableService,
@@ -523,18 +523,18 @@ export const useDashboardServiceHistoricalMetrics = <
     ...options,
   });
 /**
- * Graph Data
- * Get Graph Data.
+ * Structure Data
+ * Get Structure Data.
  * @param data The data for the request.
  * @param data.dagId
  * @param data.root
  * @param data.includeUpstream
  * @param data.includeDownstream
- * @returns GraphDataResponse Successful Response
+ * @returns StructureDataResponse Successful Response
  * @throws ApiError
  */
-export const useGraphServiceGraphData = <
-  TData = Common.GraphServiceGraphDataDefaultResponse,
+export const useStructureServiceStructureData = <
+  TData = Common.StructureServiceStructureDataDefaultResponse,
   TError = unknown,
   TQueryKey extends Array<unknown> = unknown[],
 >(
@@ -553,12 +553,12 @@ export const useGraphServiceGraphData = <
   options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
 ) =>
   useQuery<TData, TError>({
-    queryKey: Common.UseGraphServiceGraphDataKeyFn(
+    queryKey: Common.UseStructureServiceStructureDataKeyFn(
       { dagId, includeDownstream, includeUpstream, root },
       queryKey,
     ),
     queryFn: () =>
-      GraphService.graphData({
+      StructureService.structureData({
         dagId,
         includeDownstream,
         includeUpstream,
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow/ui/openapi-gen/queries/suspense.ts
index 185272ef0c2..8bc0c526b2f 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -15,13 +15,13 @@ import {
   DashboardService,
   EventLogService,
   ExtraLinksService,
-  GraphService,
   ImportErrorService,
   JobService,
   MonitorService,
   PluginService,
   PoolService,
   ProviderService,
+  StructureService,
   TaskInstanceService,
   TaskService,
   VariableService,
@@ -498,18 +498,18 @@ export const useDashboardServiceHistoricalMetricsSuspense 
= <
     ...options,
   });
 /**
- * Graph Data
- * Get Graph Data.
+ * Structure Data
+ * Get Structure Data.
  * @param data The data for the request.
  * @param data.dagId
  * @param data.root
  * @param data.includeUpstream
  * @param data.includeDownstream
- * @returns GraphDataResponse Successful Response
+ * @returns StructureDataResponse Successful Response
  * @throws ApiError
  */
-export const useGraphServiceGraphDataSuspense = <
-  TData = Common.GraphServiceGraphDataDefaultResponse,
+export const useStructureServiceStructureDataSuspense = <
+  TData = Common.StructureServiceStructureDataDefaultResponse,
   TError = unknown,
   TQueryKey extends Array<unknown> = unknown[],
 >(
@@ -528,12 +528,12 @@ export const useGraphServiceGraphDataSuspense = <
   options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
 ) =>
   useSuspenseQuery<TData, TError>({
-    queryKey: Common.UseGraphServiceGraphDataKeyFn(
+    queryKey: Common.UseStructureServiceStructureDataKeyFn(
       { dagId, includeDownstream, includeUpstream, root },
       queryKey,
     ),
     queryFn: () =>
-      GraphService.graphData({
+      StructureService.structureData({
         dagId,
         includeDownstream,
         includeUpstream,
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index ae2eeaf1687..0f704159c8d 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2838,30 +2838,6 @@ export const $FastAPIAppResponse = {
   description: "Serializer for Plugin FastAPI App responses.",
 } as const;
 
-export const $GraphDataResponse = {
-  properties: {
-    edges: {
-      items: {
-        $ref: "#/components/schemas/EdgeResponse",
-      },
-      type: "array",
-      title: "Edges",
-    },
-    nodes: {
-      $ref: "#/components/schemas/NodeResponse",
-    },
-    arrange: {
-      type: "string",
-      enum: ["BT", "LR", "RL", "TB"],
-      title: "Arrange",
-    },
-  },
-  type: "object",
-  required: ["edges", "nodes", "arrange"],
-  title: "GraphDataResponse",
-  description: "Graph Data serializer for responses.",
-} as const;
-
 export const $HTTPExceptionResponse = {
   properties: {
     detail: {
@@ -3162,19 +3138,7 @@ export const $NodeResponse = {
       ],
       title: "Id",
     },
-    value: {
-      $ref: "#/components/schemas/NodeValueResponse",
-    },
-  },
-  type: "object",
-  required: ["id", "value"],
-  title: "NodeResponse",
-  description: "Node serializer for responses.",
-} as const;
-
-export const $NodeValueResponse = {
-  properties: {
-    isMapped: {
+    is_mapped: {
       anyOf: [
         {
           type: "boolean",
@@ -3183,7 +3147,7 @@ export const $NodeValueResponse = {
           type: "null",
         },
       ],
-      title: "Ismapped",
+      title: "Is Mapped",
     },
     label: {
       anyOf: [
@@ -3196,28 +3160,6 @@ export const $NodeValueResponse = {
       ],
       title: "Label",
     },
-    labelStyle: {
-      anyOf: [
-        {
-          type: "string",
-        },
-        {
-          type: "null",
-        },
-      ],
-      title: "Labelstyle",
-    },
-    style: {
-      anyOf: [
-        {
-          type: "string",
-        },
-        {
-          type: "null",
-        },
-      ],
-      title: "Style",
-    },
     tooltip: {
       anyOf: [
         {
@@ -3229,26 +3171,7 @@ export const $NodeValueResponse = {
       ],
       title: "Tooltip",
     },
-    rx: {
-      type: "integer",
-      title: "Rx",
-    },
-    ry: {
-      type: "integer",
-      title: "Ry",
-    },
-    clusterLabelPos: {
-      anyOf: [
-        {
-          type: "string",
-        },
-        {
-          type: "null",
-        },
-      ],
-      title: "Clusterlabelpos",
-    },
-    setupTeardownType: {
+    setup_teardown_type: {
       anyOf: [
         {
           type: "string",
@@ -3258,13 +3181,18 @@ export const $NodeValueResponse = {
           type: "null",
         },
       ],
-      title: "Setupteardowntype",
+      title: "Setup Teardown Type",
+    },
+    type: {
+      type: "string",
+      enum: ["join", "sensor", "task", "task_group"],
+      title: "Type",
     },
   },
   type: "object",
-  required: ["rx", "ry"],
-  title: "NodeValueResponse",
-  description: "Graph Node Value responses.",
+  required: ["id", "type"],
+  title: "NodeResponse",
+  description: "Node serializer for responses.",
 } as const;
 
 export const $PatchTaskInstanceBody = {
@@ -3755,6 +3683,34 @@ export const $SchedulerInfoResponse = {
   description: "Scheduler info serializer for responses.",
 } as const;
 
+export const $StructureDataResponse = {
+  properties: {
+    edges: {
+      items: {
+        $ref: "#/components/schemas/EdgeResponse",
+      },
+      type: "array",
+      title: "Edges",
+    },
+    nodes: {
+      items: {
+        $ref: "#/components/schemas/NodeResponse",
+      },
+      type: "array",
+      title: "Nodes",
+    },
+    arrange: {
+      type: "string",
+      enum: ["BT", "LR", "RL", "TB"],
+      title: "Arrange",
+    },
+  },
+  type: "object",
+  required: ["edges", "nodes", "arrange"],
+  title: "StructureDataResponse",
+  description: "Structure Data serializer for responses.",
+} as const;
+
 export const $TaskCollectionResponse = {
   properties: {
     tasks: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index c5e494fe119..0536c0afa80 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -34,8 +34,8 @@ import type {
   RecentDagRunsResponse,
   HistoricalMetricsData,
   HistoricalMetricsResponse,
-  GraphDataData,
-  GraphDataResponse2,
+  StructureDataData,
+  StructureDataResponse2,
   ListBackfillsData,
   ListBackfillsResponse,
   CreateBackfillData,
@@ -665,24 +665,24 @@ export class DashboardService {
   }
 }
 
-export class GraphService {
+export class StructureService {
   /**
-   * Graph Data
-   * Get Graph Data.
+   * Structure Data
+   * Get Structure Data.
    * @param data The data for the request.
    * @param data.dagId
    * @param data.root
    * @param data.includeUpstream
    * @param data.includeDownstream
-   * @returns GraphDataResponse Successful Response
+   * @returns StructureDataResponse Successful Response
    * @throws ApiError
    */
-  public static graphData(
-    data: GraphDataData,
-  ): CancelablePromise<GraphDataResponse2> {
+  public static structureData(
+    data: StructureDataData,
+  ): CancelablePromise<StructureDataResponse2> {
     return __request(OpenAPI, {
       method: "GET",
-      url: "/ui/graph/graph_data",
+      url: "/ui/structure/structure_data",
       query: {
         dag_id: data.dagId,
         root: data.root,
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 1f1838d0870..6977b4de636 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -680,17 +680,6 @@ export type FastAPIAppResponse = {
   [key: string]: unknown | string;
 };
 
-/**
- * Graph Data serializer for responses.
- */
-export type GraphDataResponse = {
-  edges: Array<EdgeResponse>;
-  nodes: NodeResponse;
-  arrange: "BT" | "LR" | "RL" | "TB";
-};
-
-export type arrange = "BT" | "LR" | "RL" | "TB";
-
 /**
  * HTTPException Model used for error response.
  */
@@ -773,24 +762,15 @@ export type JobResponse = {
 export type NodeResponse = {
   children?: Array<NodeResponse> | null;
   id: string | null;
-  value: NodeValueResponse;
-};
-
-/**
- * Graph Node Value responses.
- */
-export type NodeValueResponse = {
-  isMapped?: boolean | null;
+  is_mapped?: boolean | null;
   label?: string | null;
-  labelStyle?: string | null;
-  style?: string | null;
   tooltip?: string | null;
-  rx: number;
-  ry: number;
-  clusterLabelPos?: string | null;
-  setupTeardownType?: "setup" | "teardown" | null;
+  setup_teardown_type?: "setup" | "teardown" | null;
+  type: "join" | "sensor" | "task" | "task_group";
 };
 
+export type type = "join" | "sensor" | "task" | "task_group";
+
 /**
  * Request body for Clear Task Instances endpoint.
  */
@@ -930,6 +910,17 @@ export type SchedulerInfoResponse = {
   latest_scheduler_heartbeat: string | null;
 };
 
+/**
+ * Structure Data serializer for responses.
+ */
+export type StructureDataResponse = {
+  edges: Array<EdgeResponse>;
+  nodes: Array<NodeResponse>;
+  arrange: "BT" | "LR" | "RL" | "TB";
+};
+
+export type arrange = "BT" | "LR" | "RL" | "TB";
+
 /**
  * Task collection serializer for responses.
  */
@@ -1425,14 +1416,14 @@ export type HistoricalMetricsData = {
 
 export type HistoricalMetricsResponse = HistoricalMetricDataResponse;
 
-export type GraphDataData = {
+export type StructureDataData = {
   dagId: string;
   includeDownstream?: boolean;
   includeUpstream?: boolean;
   root?: string | null;
 };
 
-export type GraphDataResponse2 = GraphDataResponse;
+export type StructureDataResponse2 = StructureDataResponse;
 
 export type ListBackfillsData = {
   dagId: string;
@@ -2456,14 +2447,14 @@ export type $OpenApiTs = {
       };
     };
   };
-  "/ui/graph/graph_data": {
+  "/ui/structure/structure_data": {
     get: {
-      req: GraphDataData;
+      req: StructureDataData;
       res: {
         /**
          * Successful Response
          */
-        200: GraphDataResponse;
+        200: StructureDataResponse;
         /**
          * Bad Request
          */
diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py
index 3597c7f893c..3d3738f5a8d 100644
--- a/airflow/utils/task_group.py
+++ b/airflow/utils/task_group.py
@@ -81,6 +81,60 @@ def task_group_to_dict(task_item_or_group):
     """Create a nested dict representation of this TaskGroup and its children 
used to construct the Graph."""
     from airflow.models.abstractoperator import AbstractOperator
     from airflow.models.mappedoperator import MappedOperator
+    from airflow.sensors.base import BaseSensorOperator
+
+    if isinstance(task := task_item_or_group, AbstractOperator):
+        setup_teardown_type = {}
+        is_mapped = {}
+        node_type = {"type": "task"}
+        if task.is_setup is True:
+            setup_teardown_type["setup_teardown_type"] = "setup"
+        elif task.is_teardown is True:
+            setup_teardown_type["setup_teardown_type"] = "teardown"
+        if isinstance(task, MappedOperator):
+            is_mapped["is_mapped"] = True
+        if isinstance(task, BaseSensorOperator):
+            node_type["type"] = "sensor"
+        return {
+            "id": task.task_id,
+            "label": task.label,
+            **is_mapped,
+            **setup_teardown_type,
+            **node_type,
+        }
+
+    task_group = task_item_or_group
+    is_mapped = isinstance(task_group, MappedTaskGroup)
+    children = [
+        task_group_to_dict(child) for child in 
sorted(task_group.children.values(), key=lambda t: t.label)
+    ]
+
+    if task_group.upstream_group_ids or task_group.upstream_task_ids:
+        # This is the join node used to reduce the number of edges between two 
TaskGroup.
+        children.append({"id": task_group.upstream_join_id, "label": "", 
"type": "join"})
+
+    if task_group.downstream_group_ids or task_group.downstream_task_ids:
+        # This is the join node used to reduce the number of edges between two 
TaskGroup.
+        children.append({"id": task_group.downstream_join_id, "label": "", 
"type": "join"})
+
+    return {
+        "id": task_group.group_id,
+        "label": task_group.label,
+        "tooltip": task_group.tooltip,
+        "is_mapped": is_mapped,
+        "children": children,
+        "type": "task_group",
+    }
+
+
+def task_group_to_dict_legacy(task_item_or_group):
+    """
+    Legacy function to create a nested dict representation of this TaskGroup 
and its children used to construct the Graph.
+
+    TODO: To remove for airflow 3 once the legacy UI is deleted.
+    """
+    from airflow.models.abstractoperator import AbstractOperator
+    from airflow.models.mappedoperator import MappedOperator
 
     if isinstance(task := task_item_or_group, AbstractOperator):
         setup_teardown_type = {}
@@ -106,7 +160,8 @@ def task_group_to_dict(task_item_or_group):
     task_group = task_item_or_group
     is_mapped = isinstance(task_group, MappedTaskGroup)
     children = [
-        task_group_to_dict(child) for child in 
sorted(task_group.children.values(), key=lambda t: t.label)
+        task_group_to_dict_legacy(child)
+        for child in sorted(task_group.children.values(), key=lambda t: 
t.label)
     ]
 
     if task_group.upstream_group_ids or task_group.upstream_task_ids:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 57f7b3c204f..5fc189800f6 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -132,7 +132,7 @@ from airflow.utils.net import get_hostname
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
 from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.strings import to_boolean
-from airflow.utils.task_group import TaskGroup, task_group_to_dict
+from airflow.utils.task_group import TaskGroup, task_group_to_dict_legacy
 from airflow.utils.timezone import td_format, utcnow
 from airflow.utils.types import NOTSET, DagRunTriggeredByType
 from airflow.version import version
@@ -3243,7 +3243,7 @@ class Airflow(AirflowBaseView):
                 task_ids_or_regex=root, include_upstream=filter_upstream, 
include_downstream=filter_downstream
             )
 
-        nodes = task_group_to_dict(dag.task_group)
+        nodes = task_group_to_dict_legacy(dag.task_group)
         edges = dag_edges(dag)
 
         data = {
diff --git a/task_sdk/src/airflow/sdk/definitions/taskgroup.py 
b/task_sdk/src/airflow/sdk/definitions/taskgroup.py
index 7395b341740..fd02a4c94e7 100644
--- a/task_sdk/src/airflow/sdk/definitions/taskgroup.py
+++ b/task_sdk/src/airflow/sdk/definitions/taskgroup.py
@@ -146,7 +146,10 @@ class TaskGroup(DAGNode):
         if self.parent_group:
             self.parent_group.add(self)
             if self.parent_group.default_args:
-                self.default_args = {**self.parent_group.default_args, 
**self.default_args}
+                self.default_args = {
+                    **self.parent_group.default_args,
+                    **self.default_args,
+                }
 
         if self._group_id:
             self.used_group_ids.add(self.group_id)
@@ -235,7 +238,9 @@ class TaskGroup(DAGNode):
             if self.dag:
                 if task.dag is not None and self.dag is not task.dag:
                     raise RuntimeError(
-                        "Cannot mix TaskGroups from different DAGs: %s and 
%s", self.dag, task.dag
+                        "Cannot mix TaskGroups from different DAGs: %s and %s",
+                        self.dag,
+                        task.dag,
                     )
                 task.dag = self.dag
             if task.children:
@@ -268,7 +273,10 @@ class TaskGroup(DAGNode):
         return self._group_id
 
     def update_relative(
-        self, other: DependencyMixin, upstream: bool = True, edge_modifier: 
EdgeModifier | None = None
+        self,
+        other: DependencyMixin,
+        upstream: bool = True,
+        edge_modifier: EdgeModifier | None = None,
     ) -> None:
         """
         Override TaskMixin.update_relative.
@@ -463,7 +471,10 @@ class TaskGroup(DAGNode):
         from airflow.serialization.enums import DagAttributeTypes
         from airflow.serialization.serialized_objects import 
TaskGroupSerialization
 
-        return DagAttributeTypes.TASK_GROUP, 
TaskGroupSerialization.serialize_task_group(self)
+        return (
+            DagAttributeTypes.TASK_GROUP,
+            TaskGroupSerialization.serialize_task_group(self),
+        )
 
     def hierarchical_alphabetical_sort(self):
         """
@@ -475,7 +486,8 @@ class TaskGroup(DAGNode):
         :return: list of tasks in hierarchical alphabetical order
         """
         return sorted(
-            self.children.values(), key=lambda node: (not isinstance(node, 
TaskGroup), node.node_id)
+            self.children.values(),
+            key=lambda node: (not isinstance(node, TaskGroup), node.node_id),
         )
 
     def topological_sort(self):
@@ -626,28 +638,28 @@ def task_group_to_dict(task_item_or_group):
     """Create a nested dict representation of this TaskGroup and its children 
used to construct the Graph."""
     from airflow.models.abstractoperator import AbstractOperator
     from airflow.models.mappedoperator import MappedOperator
+    from airflow.sensors.base import BaseSensorOperator
 
     if isinstance(task := task_item_or_group, AbstractOperator):
         setup_teardown_type = {}
         is_mapped = {}
+        node_type = {"type": "task"}
         if task.is_setup is True:
-            setup_teardown_type["setupTeardownType"] = "setup"
+            setup_teardown_type["setup_teardown_type"] = "setup"
         elif task.is_teardown is True:
-            setup_teardown_type["setupTeardownType"] = "teardown"
+            setup_teardown_type["setup_teardown_type"] = "teardown"
         if isinstance(task, MappedOperator):
-            is_mapped["isMapped"] = True
+            is_mapped["is_mapped"] = True
+        if isinstance(task, BaseSensorOperator):
+            node_type["type"] = "sensor"
         return {
             "id": task.task_id,
-            "value": {
-                "label": task.label,
-                "labelStyle": f"fill:{task.ui_fgcolor};",
-                "style": f"fill:{task.ui_color};",
-                "rx": 5,
-                "ry": 5,
-                **is_mapped,
-                **setup_teardown_type,
-            },
+            "label": task.label,
+            **is_mapped,
+            **setup_teardown_type,
+            **node_type,
         }
+
     task_group = task_item_or_group
     is_mapped = isinstance(task_group, MappedTaskGroup)
     children = [
@@ -655,43 +667,18 @@ def task_group_to_dict(task_item_or_group):
     ]
 
     if task_group.upstream_group_ids or task_group.upstream_task_ids:
-        children.append(
-            {
-                "id": task_group.upstream_join_id,
-                "value": {
-                    "label": "",
-                    "labelStyle": f"fill:{task_group.ui_fgcolor};",
-                    "style": f"fill:{task_group.ui_color};",
-                    "shape": "circle",
-                },
-            }
-        )
+        # This is the join node used to reduce the number of edges between two 
TaskGroup.
+        children.append({"id": task_group.upstream_join_id, "label": "", 
"type": "join"})
 
     if task_group.downstream_group_ids or task_group.downstream_task_ids:
         # This is the join node used to reduce the number of edges between two 
TaskGroup.
-        children.append(
-            {
-                "id": task_group.downstream_join_id,
-                "value": {
-                    "label": "",
-                    "labelStyle": f"fill:{task_group.ui_fgcolor};",
-                    "style": f"fill:{task_group.ui_color};",
-                    "shape": "circle",
-                },
-            }
-        )
+        children.append({"id": task_group.downstream_join_id, "label": "", 
"type": "join"})
 
     return {
         "id": task_group.group_id,
-        "value": {
-            "label": task_group.label,
-            "labelStyle": f"fill:{task_group.ui_fgcolor};",
-            "style": f"fill:{task_group.ui_color}",
-            "rx": 5,
-            "ry": 5,
-            "clusterLabelPos": "top",
-            "tooltip": task_group.tooltip,
-            "isMapped": is_mapped,
-        },
+        "label": task_group.label,
+        "tooltip": task_group.tooltip,
+        "is_mapped": is_mapped,
         "children": children,
+        "type": "task_group",
     }
diff --git a/tests/api_fastapi/core_api/routes/ui/test_graph.py 
b/tests/api_fastapi/core_api/routes/ui/test_graph.py
deleted file mode 100644
index 85dabd3ac6a..00000000000
--- a/tests/api_fastapi/core_api/routes/ui/test_graph.py
+++ /dev/null
@@ -1,198 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import pendulum
-import pytest
-
-from airflow.models import DagBag
-from airflow.operators.empty import EmptyOperator
-
-from tests_common.test_utils.db import clear_db_runs
-
-pytestmark = pytest.mark.db_test
-
-DAG_ID = "test_dag_id"
-
-
[email protected](autouse=True, scope="module")
-def examples_dag_bag():
-    # Speed up: We don't want example dags for this module
-
-    return DagBag(include_examples=False, read_dags_from_db=True)
-
-
[email protected](autouse=True)
-def clean():
-    clear_db_runs()
-    yield
-    clear_db_runs()
-
-
[email protected]
-def make_dag(dag_maker, session, time_machine):
-    with dag_maker(
-        dag_id=DAG_ID,
-        serialized=True,
-        session=session,
-        start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC),
-    ):
-        EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2")
-
-    dag_maker.dagbag.sync_to_db()
-
-
-class TestGraphDataEndpoint:
-    @pytest.mark.parametrize(
-        "params, expected",
-        [
-            (
-                {"dag_id": DAG_ID},
-                {
-                    "arrange": "LR",
-                    "edges": [
-                        {
-                            "is_setup_teardown": None,
-                            "label": None,
-                            "source_id": "task_1",
-                            "target_id": "task_2",
-                        },
-                    ],
-                    "nodes": {
-                        "children": [
-                            {
-                                "children": None,
-                                "id": "task_1",
-                                "value": {
-                                    "clusterLabelPos": None,
-                                    "isMapped": None,
-                                    "label": "task_1",
-                                    "labelStyle": "fill:#000;",
-                                    "rx": 5,
-                                    "ry": 5,
-                                    "setupTeardownType": None,
-                                    "style": "fill:#e8f7e4;",
-                                    "tooltip": None,
-                                },
-                            },
-                            {
-                                "children": None,
-                                "id": "task_2",
-                                "value": {
-                                    "clusterLabelPos": None,
-                                    "isMapped": None,
-                                    "label": "task_2",
-                                    "labelStyle": "fill:#000;",
-                                    "rx": 5,
-                                    "ry": 5,
-                                    "setupTeardownType": None,
-                                    "style": "fill:#e8f7e4;",
-                                    "tooltip": None,
-                                },
-                            },
-                        ],
-                        "id": None,
-                        "value": {
-                            "clusterLabelPos": "top",
-                            "isMapped": False,
-                            "label": None,
-                            "labelStyle": "fill:#000;",
-                            "rx": 5,
-                            "ry": 5,
-                            "setupTeardownType": None,
-                            "style": "fill:CornflowerBlue",
-                            "tooltip": "",
-                        },
-                    },
-                },
-            ),
-            (
-                {
-                    "dag_id": DAG_ID,
-                    "root": "unknown_task",
-                },
-                {
-                    "arrange": "LR",
-                    "edges": [],
-                    "nodes": {
-                        "children": [],
-                        "id": None,
-                        "value": {
-                            "clusterLabelPos": "top",
-                            "isMapped": False,
-                            "label": None,
-                            "labelStyle": "fill:#000;",
-                            "rx": 5,
-                            "ry": 5,
-                            "setupTeardownType": None,
-                            "style": "fill:CornflowerBlue",
-                            "tooltip": "",
-                        },
-                    },
-                },
-            ),
-            (
-                {
-                    "dag_id": DAG_ID,
-                    "root": "task_1",
-                    "filter_upstream": False,
-                    "filter_downstream": False,
-                },
-                {
-                    "arrange": "LR",
-                    "edges": [],
-                    "nodes": {
-                        "children": [
-                            {
-                                "children": None,
-                                "id": "task_1",
-                                "value": {
-                                    "clusterLabelPos": None,
-                                    "isMapped": None,
-                                    "label": "task_1",
-                                    "labelStyle": "fill:#000;",
-                                    "rx": 5,
-                                    "ry": 5,
-                                    "setupTeardownType": None,
-                                    "style": "fill:#e8f7e4;",
-                                    "tooltip": None,
-                                },
-                            },
-                        ],
-                        "id": None,
-                        "value": {
-                            "clusterLabelPos": "top",
-                            "isMapped": False,
-                            "label": None,
-                            "labelStyle": "fill:#000;",
-                            "rx": 5,
-                            "ry": 5,
-                            "setupTeardownType": None,
-                            "style": "fill:CornflowerBlue",
-                            "tooltip": "",
-                        },
-                    },
-                },
-            ),
-        ],
-    )
-    @pytest.mark.usefixtures("make_dag")
-    def test_historical_metrics_data(self, test_client, params, expected):
-        response = test_client.get("/ui/graph/graph_data", params=params)
-        assert response.status_code == 200
-        assert response.json() == expected
diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py 
b/tests/api_fastapi/core_api/routes/ui/test_structure.py
new file mode 100644
index 00000000000..202f8b207a7
--- /dev/null
+++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py
@@ -0,0 +1,134 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pendulum
+import pytest
+
+from airflow.models import DagBag
+from airflow.operators.empty import EmptyOperator
+
+from tests_common.test_utils.db import clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+DAG_ID = "test_dag_id"
+
+
[email protected](autouse=True, scope="module")
+def examples_dag_bag():
+    # Speed up: We don't want example dags for this module
+
+    return DagBag(include_examples=False, read_dags_from_db=True)
+
+
[email protected](autouse=True)
+def clean():
+    clear_db_runs()
+    yield
+    clear_db_runs()
+
+
[email protected]
+def make_dag(dag_maker, session, time_machine):
+    with dag_maker(
+        dag_id=DAG_ID,
+        serialized=True,
+        session=session,
+        start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC),
+    ):
+        EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2")
+
+    dag_maker.dagbag.sync_to_db()
+
+
+class TestStructureDataEndpoint:
+    @pytest.mark.parametrize(
+        "params, expected",
+        [
+            (
+                {"dag_id": DAG_ID},
+                {
+                    "arrange": "LR",
+                    "edges": [
+                        {
+                            "is_setup_teardown": None,
+                            "label": None,
+                            "source_id": "task_1",
+                            "target_id": "task_2",
+                        },
+                    ],
+                    "nodes": [
+                        {
+                            "children": None,
+                            "id": "task_1",
+                            "is_mapped": None,
+                            "label": "task_1",
+                            "setup_teardown_type": None,
+                            "tooltip": None,
+                            "type": "task",
+                        },
+                        {
+                            "children": None,
+                            "id": "task_2",
+                            "is_mapped": None,
+                            "label": "task_2",
+                            "setup_teardown_type": None,
+                            "tooltip": None,
+                            "type": "task",
+                        },
+                    ],
+                },
+            ),
+            (
+                {
+                    "dag_id": DAG_ID,
+                    "root": "unknown_task",
+                },
+                {"arrange": "LR", "edges": [], "nodes": []},
+            ),
+            (
+                {
+                    "dag_id": DAG_ID,
+                    "root": "task_1",
+                    "filter_upstream": False,
+                    "filter_downstream": False,
+                },
+                {
+                    "arrange": "LR",
+                    "edges": [],
+                    "nodes": [
+                        {
+                            "children": None,
+                            "id": "task_1",
+                            "is_mapped": None,
+                            "label": "task_1",
+                            "setup_teardown_type": None,
+                            "tooltip": None,
+                            "type": "task",
+                        },
+                    ],
+                },
+            ),
+        ],
+    )
+    @pytest.mark.usefixtures("make_dag")
+    def test_historical_metrics_data(self, test_client, params, expected):
+        response = test_client.get("/ui/structure/structure_data", 
params=params)
+        assert response.status_code == 200
+        assert response.json() == expected
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index 00caac703a4..7b5c960f9c2 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -35,7 +35,7 @@ from airflow.models.dag import DAG
 from airflow.models.xcom_arg import XComArg
 from airflow.operators.empty import EmptyOperator
 from airflow.utils.dag_edges import dag_edges
-from airflow.utils.task_group import TaskGroup, task_group_to_dict
+from airflow.utils.task_group import TaskGroup, task_group_to_dict, 
task_group_to_dict_legacy
 
 from tests.models import DEFAULT_DATE
 from tests_common.test_utils.compat import BashOperator, PythonOperator
@@ -54,7 +54,7 @@ def make_task(name, type_="classic"):
         return my_task.override(task_id=name)()
 
 
-EXPECTED_JSON = {
+EXPECTED_JSON_LEGACY = {
     "id": None,
     "value": {
         "label": None,
@@ -168,6 +168,41 @@ EXPECTED_JSON = {
     ],
 }
 
+EXPECTED_JSON = {
+    "id": None,
+    "label": None,
+    "tooltip": "",
+    "is_mapped": False,
+    "children": [
+        {
+            "id": "group234",
+            "label": "group234",
+            "tooltip": "",
+            "is_mapped": False,
+            "children": [
+                {
+                    "id": "group234.group34",
+                    "label": "group34",
+                    "tooltip": "",
+                    "is_mapped": False,
+                    "children": [
+                        {"id": "group234.group34.task3", "label": "task3", 
"type": "task"},
+                        {"id": "group234.group34.task4", "label": "task4", 
"type": "task"},
+                        {"id": "group234.group34.downstream_join_id", "label": 
"", "type": "join"},
+                    ],
+                    "type": "task_group",
+                },
+                {"id": "group234.task2", "label": "task2", "type": "task"},
+                {"id": "group234.upstream_join_id", "label": "", "type": 
"join"},
+            ],
+            "type": "task_group",
+        },
+        {"id": "task1", "label": "task1", "type": "task"},
+        {"id": "task5", "label": "task5", "type": "task"},
+    ],
+    "type": "task_group",
+}
+
 
 def test_build_task_group_context_manager():
     logical_date = pendulum.parse("20200101")
@@ -199,6 +234,7 @@ def test_build_task_group_context_manager():
     assert set(dag.task_group.children.keys()) == {"task1", "group234", 
"task5"}
     assert group34.group_id == "group234.group34"
 
+    assert task_group_to_dict_legacy(dag.task_group) == EXPECTED_JSON_LEGACY
     assert task_group_to_dict(dag.task_group) == EXPECTED_JSON
 
 
@@ -220,17 +256,21 @@ def test_build_task_group():
     task1 >> group234
     group34 >> task5
 
+    assert task_group_to_dict_legacy(dag.task_group) == EXPECTED_JSON_LEGACY
     assert task_group_to_dict(dag.task_group) == EXPECTED_JSON
 
 
-def extract_node_id(node, include_label=False):
+def extract_node_id(node, include_label=False, from_legacy=False):
     ret = {"id": node["id"]}
     if include_label:
-        ret["label"] = node["value"]["label"]
+        if from_legacy:
+            ret["label"] = node["value"]["label"]
+        else:
+            ret["label"] = node["label"]
     if "children" in node:
         children = []
         for child in node["children"]:
-            children.append(extract_node_id(child, 
include_label=include_label))
+            children.append(extract_node_id(child, 
include_label=include_label, from_legacy=from_legacy))
 
         ret["children"] = children
 
@@ -267,7 +307,7 @@ def test_build_task_group_with_prefix():
     assert group234.get_child_by_label("group34") == group34
     assert group4.get_child_by_label("task4") == task4
 
-    assert extract_node_id(task_group_to_dict(dag.task_group), 
include_label=True) == {
+    expected_node_id = {
         "id": None,
         "label": None,
         "children": [
@@ -297,6 +337,12 @@ def test_build_task_group_with_prefix():
         ],
     }
 
+    assert (
+        extract_node_id(task_group_to_dict_legacy(dag.task_group), 
include_label=True, from_legacy=True)
+        == expected_node_id
+    )
+    assert extract_node_id(task_group_to_dict(dag.task_group), 
include_label=True) == expected_node_id
+
 
 def test_build_task_group_with_task_decorator():
     """
@@ -341,7 +387,7 @@ def test_build_task_group_with_task_decorator():
     assert tsk_1.operator in tsk_3.operator.upstream_list
     assert tsk_5.operator in tsk_4.operator.downstream_list
 
-    assert extract_node_id(task_group_to_dict(dag.task_group)) == {
+    expected_node_id = {
         "id": None,
         "children": [
             {
@@ -359,6 +405,9 @@ def test_build_task_group_with_task_decorator():
         ],
     }
 
+    assert extract_node_id(task_group_to_dict_legacy(dag.task_group), 
from_legacy=True) == expected_node_id
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == 
expected_node_id
+
     edges = dag_edges(dag)
     assert sorted((e["source_id"], e["target_id"]) for e in edges) == [
         ("group234.downstream_join_id", "task_5"),
@@ -398,7 +447,7 @@ def test_sub_dag_task_group():
 
     subdag = dag.partial_subset(task_ids_or_regex="task5", 
include_upstream=True, include_downstream=False)
 
-    assert extract_node_id(task_group_to_dict(subdag.task_group)) == {
+    expected_node_id = {
         "id": None,
         "children": [
             {
@@ -420,6 +469,9 @@ def test_sub_dag_task_group():
         ],
     }
 
+    assert extract_node_id(task_group_to_dict_legacy(subdag.task_group), 
from_legacy=True) == expected_node_id
+    assert extract_node_id(task_group_to_dict(subdag.task_group)) == 
expected_node_id
+
     edges = dag_edges(subdag)
     assert sorted((e["source_id"], e["target_id"]) for e in edges) == [
         ("group234.group34.downstream_join_id", "task5"),
@@ -485,10 +537,11 @@ def test_dag_edges():
 
         group_d << group_c
 
-    nodes = task_group_to_dict(dag.task_group)
+    nodes_legacy = task_group_to_dict_legacy(dag.task_group)
+    nodes = task_group_to_dict_legacy(dag.task_group)
     edges = dag_edges(dag)
 
-    assert extract_node_id(nodes) == {
+    expected_node_id = {
         "id": None,
         "children": [
             {
@@ -532,6 +585,9 @@ def test_dag_edges():
         ],
     }
 
+    assert extract_node_id(nodes_legacy) == expected_node_id
+    assert extract_node_id(nodes, from_legacy=False) == expected_node_id
+
     assert sorted((e["source_id"], e["target_id"]) for e in edges) == [
         ("group_a.downstream_join_id", "group_c.upstream_join_id"),
         ("group_a.group_b.downstream_join_id", "group_a.task5"),
@@ -787,6 +843,7 @@ def test_build_task_group_deco_context_manager():
         ],
     }
 
+    assert extract_node_id(task_group_to_dict_legacy(dag.task_group), 
from_legacy=True) == node_ids
     assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
 
 
@@ -968,6 +1025,7 @@ def test_task_group_context_mix():
         ],
     }
 
+    assert extract_node_id(task_group_to_dict_legacy(dag.task_group), 
from_legacy=True) == node_ids
     assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
 
 
@@ -1062,6 +1120,7 @@ def test_duplicate_task_group_id():
         ],
     }
 
+    assert extract_node_id(task_group_to_dict_legacy(dag.task_group), 
from_legacy=True) == node_ids
     assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
 
 
@@ -1124,6 +1183,7 @@ def test_call_taskgroup_twice():
         ],
     }
 
+    assert extract_node_id(task_group_to_dict_legacy(dag.task_group), 
from_legacy=True) == node_ids
     assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
 
 
@@ -1593,13 +1653,25 @@ def test_task_group_arrow_with_setup_group():
     assert set(t1.operator.downstream_task_ids) == set()
     assert set(t2.operator.downstream_task_ids) == set()
 
-    def get_nodes(group):
-        d = task_group_to_dict(group)
+    def get_nodes(group, from_legacy=False):
+        if from_legacy:
+            d = task_group_to_dict_legacy(group)
+        else:
+            d = task_group_to_dict(group)
         new_d = {}
         new_d["id"] = d["id"]
         new_d["children"] = [{"id": x["id"]} for x in d["children"]]
         return new_d
 
+    assert get_nodes(g1, from_legacy=True) == {
+        "id": "group_1",
+        "children": [
+            {"id": "group_1.setup_1"},
+            {"id": "group_1.setup_2"},
+            {"id": "group_1.downstream_join_id"},
+        ],
+    }
+
     assert get_nodes(g1) == {
         "id": "group_1",
         "children": [

Reply via email to