This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c0f282643e4 AIP-84 Graph Data update datamodel (#44459)
c0f282643e4 is described below
commit c0f282643e4cd1c4f55f1c505f6407c50bf658dd
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Mon Dec 2 23:27:27 2024 +0800
AIP-84 Graph Data update datamodel (#44459)
* AIP-84 Update Structure Datamodel
* Update task sdk task_group_to_dict
---
.../datamodels/ui/{graph.py => structure.py} | 26 +--
.../api_fastapi/core_api/openapi/v1-generated.yaml | 118 +++++-------
airflow/api_fastapi/core_api/routes/ui/__init__.py | 4 +-
.../core_api/routes/ui/{graph.py => structure.py} | 20 ++-
airflow/ui/openapi-gen/queries/common.ts | 17 +-
airflow/ui/openapi-gen/queries/prefetch.ts | 14 +-
airflow/ui/openapi-gen/queries/queries.ts | 16 +-
airflow/ui/openapi-gen/queries/suspense.ts | 16 +-
airflow/ui/openapi-gen/requests/schemas.gen.ts | 124 +++++--------
airflow/ui/openapi-gen/requests/services.gen.ts | 20 +--
airflow/ui/openapi-gen/requests/types.gen.ts | 51 +++---
airflow/utils/task_group.py | 57 +++++-
airflow/www/views.py | 4 +-
task_sdk/src/airflow/sdk/definitions/taskgroup.py | 85 ++++-----
tests/api_fastapi/core_api/routes/ui/test_graph.py | 198 ---------------------
.../core_api/routes/ui/test_structure.py | 134 ++++++++++++++
tests/utils/test_task_group.py | 96 ++++++++--
17 files changed, 485 insertions(+), 515 deletions(-)
diff --git a/airflow/api_fastapi/core_api/datamodels/ui/graph.py
b/airflow/api_fastapi/core_api/datamodels/ui/structure.py
similarity index 75%
rename from airflow/api_fastapi/core_api/datamodels/ui/graph.py
rename to airflow/api_fastapi/core_api/datamodels/ui/structure.py
index b4d7587b35e..e3df958a1a8 100644
--- a/airflow/api_fastapi/core_api/datamodels/ui/graph.py
+++ b/airflow/api_fastapi/core_api/datamodels/ui/structure.py
@@ -30,31 +30,21 @@ class EdgeResponse(BaseModel):
target_id: str
-class NodeValueResponse(BaseModel):
- """Graph Node Value responses."""
-
- isMapped: bool | None = None
- label: str | None = None
- labelStyle: str | None = None
- style: str | None = None
- tooltip: str | None = None
- rx: int
- ry: int
- clusterLabelPos: str | None = None
- setupTeardownType: Literal["setup", "teardown"] | None = None
-
-
class NodeResponse(BaseModel):
"""Node serializer for responses."""
children: list[NodeResponse] | None = None
id: str | None
- value: NodeValueResponse
+ is_mapped: bool | None = None
+ label: str | None = None
+ tooltip: str | None = None
+ setup_teardown_type: Literal["setup", "teardown"] | None = None
+ type: Literal["join", "sensor", "task", "task_group"]
-class GraphDataResponse(BaseModel):
- """Graph Data serializer for responses."""
+class StructureDataResponse(BaseModel):
+ """Structure Data serializer for responses."""
edges: list[EdgeResponse]
- nodes: NodeResponse
+ nodes: list[NodeResponse]
arrange: Literal["BT", "LR", "RL", "TB"]
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 20c450cf0a2..87a63a3c479 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -194,13 +194,13 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
- /ui/graph/graph_data:
+ /ui/structure/structure_data:
get:
tags:
- - Graph
- summary: Graph Data
- description: Get Graph Data.
- operationId: graph_data
+ - Structure
+ summary: Structure Data
+ description: Get Structure Data.
+ operationId: structure_data
parameters:
- name: dag_id
in: query
@@ -236,7 +236,7 @@ paths:
content:
application/json:
schema:
- $ref: '#/components/schemas/GraphDataResponse'
+ $ref: '#/components/schemas/StructureDataResponse'
'400':
content:
application/json:
@@ -7594,30 +7594,6 @@ components:
- name
title: FastAPIAppResponse
description: Serializer for Plugin FastAPI App responses.
- GraphDataResponse:
- properties:
- edges:
- items:
- $ref: '#/components/schemas/EdgeResponse'
- type: array
- title: Edges
- nodes:
- $ref: '#/components/schemas/NodeResponse'
- arrange:
- type: string
- enum:
- - BT
- - LR
- - RL
- - TB
- title: Arrange
- type: object
- required:
- - edges
- - nodes
- - arrange
- title: GraphDataResponse
- description: Graph Data serializer for responses.
HTTPExceptionResponse:
properties:
detail:
@@ -7810,66 +7786,43 @@ components:
- type: string
- type: 'null'
title: Id
- value:
- $ref: '#/components/schemas/NodeValueResponse'
- type: object
- required:
- - id
- - value
- title: NodeResponse
- description: Node serializer for responses.
- NodeValueResponse:
- properties:
- isMapped:
+ is_mapped:
anyOf:
- type: boolean
- type: 'null'
- title: Ismapped
+ title: Is Mapped
label:
anyOf:
- type: string
- type: 'null'
title: Label
- labelStyle:
- anyOf:
- - type: string
- - type: 'null'
- title: Labelstyle
- style:
- anyOf:
- - type: string
- - type: 'null'
- title: Style
tooltip:
anyOf:
- type: string
- type: 'null'
title: Tooltip
- rx:
- type: integer
- title: Rx
- ry:
- type: integer
- title: Ry
- clusterLabelPos:
- anyOf:
- - type: string
- - type: 'null'
- title: Clusterlabelpos
- setupTeardownType:
+ setup_teardown_type:
anyOf:
- type: string
enum:
- setup
- teardown
- type: 'null'
- title: Setupteardowntype
+ title: Setup Teardown Type
+ type:
+ type: string
+ enum:
+ - join
+ - sensor
+ - task
+ - task_group
+ title: Type
type: object
required:
- - rx
- - ry
- title: NodeValueResponse
- description: Graph Node Value responses.
+ - id
+ - type
+ title: NodeResponse
+ description: Node serializer for responses.
PatchTaskInstanceBody:
properties:
dry_run:
@@ -8219,6 +8172,33 @@ components:
- latest_scheduler_heartbeat
title: SchedulerInfoResponse
description: Scheduler info serializer for responses.
+ StructureDataResponse:
+ properties:
+ edges:
+ items:
+ $ref: '#/components/schemas/EdgeResponse'
+ type: array
+ title: Edges
+ nodes:
+ items:
+ $ref: '#/components/schemas/NodeResponse'
+ type: array
+ title: Nodes
+ arrange:
+ type: string
+ enum:
+ - BT
+ - LR
+ - RL
+ - TB
+ title: Arrange
+ type: object
+ required:
+ - edges
+ - nodes
+ - arrange
+ title: StructureDataResponse
+ description: Structure Data serializer for responses.
TaskCollectionResponse:
properties:
tasks:
diff --git a/airflow/api_fastapi/core_api/routes/ui/__init__.py
b/airflow/api_fastapi/core_api/routes/ui/__init__.py
index 0fa150b4653..2b22cc54120 100644
--- a/airflow/api_fastapi/core_api/routes/ui/__init__.py
+++ b/airflow/api_fastapi/core_api/routes/ui/__init__.py
@@ -21,7 +21,7 @@ from airflow.api_fastapi.core_api.routes.ui.assets import
assets_router
from airflow.api_fastapi.core_api.routes.ui.config import config_router
from airflow.api_fastapi.core_api.routes.ui.dags import dags_router
from airflow.api_fastapi.core_api.routes.ui.dashboard import dashboard_router
-from airflow.api_fastapi.core_api.routes.ui.graph import graph_data_router
+from airflow.api_fastapi.core_api.routes.ui.structure import structure_router
ui_router = AirflowRouter(prefix="/ui")
@@ -29,4 +29,4 @@ ui_router.include_router(assets_router)
ui_router.include_router(config_router)
ui_router.include_router(dags_router)
ui_router.include_router(dashboard_router)
-ui_router.include_router(graph_data_router)
+ui_router.include_router(structure_router)
diff --git a/airflow/api_fastapi/core_api/routes/ui/graph.py
b/airflow/api_fastapi/core_api/routes/ui/structure.py
similarity index 79%
rename from airflow/api_fastapi/core_api/routes/ui/graph.py
rename to airflow/api_fastapi/core_api/routes/ui/structure.py
index 10dbfb150c7..a6ed936b92d 100644
--- a/airflow/api_fastapi/core_api/routes/ui/graph.py
+++ b/airflow/api_fastapi/core_api/routes/ui/structure.py
@@ -20,35 +20,37 @@ from fastapi import Request, status
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.router import AirflowRouter
-from airflow.api_fastapi.core_api.datamodels.ui.graph import GraphDataResponse
+from airflow.api_fastapi.core_api.datamodels.ui.structure import
StructureDataResponse
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.utils.dag_edges import dag_edges
from airflow.utils.task_group import task_group_to_dict
-graph_data_router = AirflowRouter(tags=["Graph"], prefix="/graph")
+structure_router = AirflowRouter(tags=["Structure"], prefix="/structure")
-@graph_data_router.get(
- "/graph_data",
+@structure_router.get(
+ "/structure_data",
include_in_schema=False,
responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST]),
)
-def graph_data(
+def structure_data(
session: SessionDep,
dag_id: str,
request: Request,
root: str | None = None,
include_upstream: bool = False,
include_downstream: bool = False,
-) -> GraphDataResponse:
- """Get Graph Data."""
+) -> StructureDataResponse:
+ """Get Structure Data."""
dag = request.app.state.dag_bag.get_dag(dag_id)
if root:
dag = dag.partial_subset(
task_ids_or_regex=root, include_upstream=include_upstream,
include_downstream=include_downstream
)
- nodes = task_group_to_dict(dag.task_group)
+ nodes = [
+ task_group_to_dict(child) for child in
sorted(dag.task_group.children.values(), key=lambda t: t.label)
+ ]
edges = dag_edges(dag)
data = {
@@ -57,4 +59,4 @@ def graph_data(
"edges": edges,
}
- return GraphDataResponse(**data)
+ return StructureDataResponse(**data)
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index f7e3576019e..766c08ae909 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -16,13 +16,13 @@ import {
DashboardService,
EventLogService,
ExtraLinksService,
- GraphService,
ImportErrorService,
JobService,
MonitorService,
PluginService,
PoolService,
ProviderService,
+ StructureService,
TaskInstanceService,
TaskService,
VariableService,
@@ -327,15 +327,16 @@ export const UseDashboardServiceHistoricalMetricsKeyFn = (
useDashboardServiceHistoricalMetricsKey,
...(queryKey ?? [{ endDate, startDate }]),
];
-export type GraphServiceGraphDataDefaultResponse = Awaited<
- ReturnType<typeof GraphService.graphData>
+export type StructureServiceStructureDataDefaultResponse = Awaited<
+ ReturnType<typeof StructureService.structureData>
>;
-export type GraphServiceGraphDataQueryResult<
- TData = GraphServiceGraphDataDefaultResponse,
+export type StructureServiceStructureDataQueryResult<
+ TData = StructureServiceStructureDataDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
-export const useGraphServiceGraphDataKey = "GraphServiceGraphData";
-export const UseGraphServiceGraphDataKeyFn = (
+export const useStructureServiceStructureDataKey =
+ "StructureServiceStructureData";
+export const UseStructureServiceStructureDataKeyFn = (
{
dagId,
includeDownstream,
@@ -349,7 +350,7 @@ export const UseGraphServiceGraphDataKeyFn = (
},
queryKey?: Array<unknown>,
) => [
- useGraphServiceGraphDataKey,
+ useStructureServiceStructureDataKey,
...(queryKey ?? [{ dagId, includeDownstream, includeUpstream, root }]),
];
export type BackfillServiceListBackfillsDefaultResponse = Awaited<
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 54603f0b160..2bf0cf3695c 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -15,13 +15,13 @@ import {
DashboardService,
EventLogService,
ExtraLinksService,
- GraphService,
ImportErrorService,
JobService,
MonitorService,
PluginService,
PoolService,
ProviderService,
+ StructureService,
TaskInstanceService,
TaskService,
VariableService,
@@ -407,17 +407,17 @@ export const prefetchUseDashboardServiceHistoricalMetrics
= (
queryFn: () => DashboardService.historicalMetrics({ endDate, startDate }),
});
/**
- * Graph Data
- * Get Graph Data.
+ * Structure Data
+ * Get Structure Data.
* @param data The data for the request.
* @param data.dagId
* @param data.root
* @param data.includeUpstream
* @param data.includeDownstream
- * @returns GraphDataResponse Successful Response
+ * @returns StructureDataResponse Successful Response
* @throws ApiError
*/
-export const prefetchUseGraphServiceGraphData = (
+export const prefetchUseStructureServiceStructureData = (
queryClient: QueryClient,
{
dagId,
@@ -432,14 +432,14 @@ export const prefetchUseGraphServiceGraphData = (
},
) =>
queryClient.prefetchQuery({
- queryKey: Common.UseGraphServiceGraphDataKeyFn({
+ queryKey: Common.UseStructureServiceStructureDataKeyFn({
dagId,
includeDownstream,
includeUpstream,
root,
}),
queryFn: () =>
- GraphService.graphData({
+ StructureService.structureData({
dagId,
includeDownstream,
includeUpstream,
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 3992c3a2edf..6b9645fcb5e 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -21,13 +21,13 @@ import {
DashboardService,
EventLogService,
ExtraLinksService,
- GraphService,
ImportErrorService,
JobService,
MonitorService,
PluginService,
PoolService,
ProviderService,
+ StructureService,
TaskInstanceService,
TaskService,
VariableService,
@@ -523,18 +523,18 @@ export const useDashboardServiceHistoricalMetrics = <
...options,
});
/**
- * Graph Data
- * Get Graph Data.
+ * Structure Data
+ * Get Structure Data.
* @param data The data for the request.
* @param data.dagId
* @param data.root
* @param data.includeUpstream
* @param data.includeDownstream
- * @returns GraphDataResponse Successful Response
+ * @returns StructureDataResponse Successful Response
* @throws ApiError
*/
-export const useGraphServiceGraphData = <
- TData = Common.GraphServiceGraphDataDefaultResponse,
+export const useStructureServiceStructureData = <
+ TData = Common.StructureServiceStructureDataDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
@@ -553,12 +553,12 @@ export const useGraphServiceGraphData = <
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
- queryKey: Common.UseGraphServiceGraphDataKeyFn(
+ queryKey: Common.UseStructureServiceStructureDataKeyFn(
{ dagId, includeDownstream, includeUpstream, root },
queryKey,
),
queryFn: () =>
- GraphService.graphData({
+ StructureService.structureData({
dagId,
includeDownstream,
includeUpstream,
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 185272ef0c2..8bc0c526b2f 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -15,13 +15,13 @@ import {
DashboardService,
EventLogService,
ExtraLinksService,
- GraphService,
ImportErrorService,
JobService,
MonitorService,
PluginService,
PoolService,
ProviderService,
+ StructureService,
TaskInstanceService,
TaskService,
VariableService,
@@ -498,18 +498,18 @@ export const useDashboardServiceHistoricalMetricsSuspense
= <
...options,
});
/**
- * Graph Data
- * Get Graph Data.
+ * Structure Data
+ * Get Structure Data.
* @param data The data for the request.
* @param data.dagId
* @param data.root
* @param data.includeUpstream
* @param data.includeDownstream
- * @returns GraphDataResponse Successful Response
+ * @returns StructureDataResponse Successful Response
* @throws ApiError
*/
-export const useGraphServiceGraphDataSuspense = <
- TData = Common.GraphServiceGraphDataDefaultResponse,
+export const useStructureServiceStructureDataSuspense = <
+ TData = Common.StructureServiceStructureDataDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
@@ -528,12 +528,12 @@ export const useGraphServiceGraphDataSuspense = <
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
- queryKey: Common.UseGraphServiceGraphDataKeyFn(
+ queryKey: Common.UseStructureServiceStructureDataKeyFn(
{ dagId, includeDownstream, includeUpstream, root },
queryKey,
),
queryFn: () =>
- GraphService.graphData({
+ StructureService.structureData({
dagId,
includeDownstream,
includeUpstream,
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index ae2eeaf1687..0f704159c8d 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2838,30 +2838,6 @@ export const $FastAPIAppResponse = {
description: "Serializer for Plugin FastAPI App responses.",
} as const;
-export const $GraphDataResponse = {
- properties: {
- edges: {
- items: {
- $ref: "#/components/schemas/EdgeResponse",
- },
- type: "array",
- title: "Edges",
- },
- nodes: {
- $ref: "#/components/schemas/NodeResponse",
- },
- arrange: {
- type: "string",
- enum: ["BT", "LR", "RL", "TB"],
- title: "Arrange",
- },
- },
- type: "object",
- required: ["edges", "nodes", "arrange"],
- title: "GraphDataResponse",
- description: "Graph Data serializer for responses.",
-} as const;
-
export const $HTTPExceptionResponse = {
properties: {
detail: {
@@ -3162,19 +3138,7 @@ export const $NodeResponse = {
],
title: "Id",
},
- value: {
- $ref: "#/components/schemas/NodeValueResponse",
- },
- },
- type: "object",
- required: ["id", "value"],
- title: "NodeResponse",
- description: "Node serializer for responses.",
-} as const;
-
-export const $NodeValueResponse = {
- properties: {
- isMapped: {
+ is_mapped: {
anyOf: [
{
type: "boolean",
@@ -3183,7 +3147,7 @@ export const $NodeValueResponse = {
type: "null",
},
],
- title: "Ismapped",
+ title: "Is Mapped",
},
label: {
anyOf: [
@@ -3196,28 +3160,6 @@ export const $NodeValueResponse = {
],
title: "Label",
},
- labelStyle: {
- anyOf: [
- {
- type: "string",
- },
- {
- type: "null",
- },
- ],
- title: "Labelstyle",
- },
- style: {
- anyOf: [
- {
- type: "string",
- },
- {
- type: "null",
- },
- ],
- title: "Style",
- },
tooltip: {
anyOf: [
{
@@ -3229,26 +3171,7 @@ export const $NodeValueResponse = {
],
title: "Tooltip",
},
- rx: {
- type: "integer",
- title: "Rx",
- },
- ry: {
- type: "integer",
- title: "Ry",
- },
- clusterLabelPos: {
- anyOf: [
- {
- type: "string",
- },
- {
- type: "null",
- },
- ],
- title: "Clusterlabelpos",
- },
- setupTeardownType: {
+ setup_teardown_type: {
anyOf: [
{
type: "string",
@@ -3258,13 +3181,18 @@ export const $NodeValueResponse = {
type: "null",
},
],
- title: "Setupteardowntype",
+ title: "Setup Teardown Type",
+ },
+ type: {
+ type: "string",
+ enum: ["join", "sensor", "task", "task_group"],
+ title: "Type",
},
},
type: "object",
- required: ["rx", "ry"],
- title: "NodeValueResponse",
- description: "Graph Node Value responses.",
+ required: ["id", "type"],
+ title: "NodeResponse",
+ description: "Node serializer for responses.",
} as const;
export const $PatchTaskInstanceBody = {
@@ -3755,6 +3683,34 @@ export const $SchedulerInfoResponse = {
description: "Scheduler info serializer for responses.",
} as const;
+export const $StructureDataResponse = {
+ properties: {
+ edges: {
+ items: {
+ $ref: "#/components/schemas/EdgeResponse",
+ },
+ type: "array",
+ title: "Edges",
+ },
+ nodes: {
+ items: {
+ $ref: "#/components/schemas/NodeResponse",
+ },
+ type: "array",
+ title: "Nodes",
+ },
+ arrange: {
+ type: "string",
+ enum: ["BT", "LR", "RL", "TB"],
+ title: "Arrange",
+ },
+ },
+ type: "object",
+ required: ["edges", "nodes", "arrange"],
+ title: "StructureDataResponse",
+ description: "Structure Data serializer for responses.",
+} as const;
+
export const $TaskCollectionResponse = {
properties: {
tasks: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index c5e494fe119..0536c0afa80 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -34,8 +34,8 @@ import type {
RecentDagRunsResponse,
HistoricalMetricsData,
HistoricalMetricsResponse,
- GraphDataData,
- GraphDataResponse2,
+ StructureDataData,
+ StructureDataResponse2,
ListBackfillsData,
ListBackfillsResponse,
CreateBackfillData,
@@ -665,24 +665,24 @@ export class DashboardService {
}
}
-export class GraphService {
+export class StructureService {
/**
- * Graph Data
- * Get Graph Data.
+ * Structure Data
+ * Get Structure Data.
* @param data The data for the request.
* @param data.dagId
* @param data.root
* @param data.includeUpstream
* @param data.includeDownstream
- * @returns GraphDataResponse Successful Response
+ * @returns StructureDataResponse Successful Response
* @throws ApiError
*/
- public static graphData(
- data: GraphDataData,
- ): CancelablePromise<GraphDataResponse2> {
+ public static structureData(
+ data: StructureDataData,
+ ): CancelablePromise<StructureDataResponse2> {
return __request(OpenAPI, {
method: "GET",
- url: "/ui/graph/graph_data",
+ url: "/ui/structure/structure_data",
query: {
dag_id: data.dagId,
root: data.root,
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 1f1838d0870..6977b4de636 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -680,17 +680,6 @@ export type FastAPIAppResponse = {
[key: string]: unknown | string;
};
-/**
- * Graph Data serializer for responses.
- */
-export type GraphDataResponse = {
- edges: Array<EdgeResponse>;
- nodes: NodeResponse;
- arrange: "BT" | "LR" | "RL" | "TB";
-};
-
-export type arrange = "BT" | "LR" | "RL" | "TB";
-
/**
* HTTPException Model used for error response.
*/
@@ -773,24 +762,15 @@ export type JobResponse = {
export type NodeResponse = {
children?: Array<NodeResponse> | null;
id: string | null;
- value: NodeValueResponse;
-};
-
-/**
- * Graph Node Value responses.
- */
-export type NodeValueResponse = {
- isMapped?: boolean | null;
+ is_mapped?: boolean | null;
label?: string | null;
- labelStyle?: string | null;
- style?: string | null;
tooltip?: string | null;
- rx: number;
- ry: number;
- clusterLabelPos?: string | null;
- setupTeardownType?: "setup" | "teardown" | null;
+ setup_teardown_type?: "setup" | "teardown" | null;
+ type: "join" | "sensor" | "task" | "task_group";
};
+export type type = "join" | "sensor" | "task" | "task_group";
+
/**
* Request body for Clear Task Instances endpoint.
*/
@@ -930,6 +910,17 @@ export type SchedulerInfoResponse = {
latest_scheduler_heartbeat: string | null;
};
+/**
+ * Structure Data serializer for responses.
+ */
+export type StructureDataResponse = {
+ edges: Array<EdgeResponse>;
+ nodes: Array<NodeResponse>;
+ arrange: "BT" | "LR" | "RL" | "TB";
+};
+
+export type arrange = "BT" | "LR" | "RL" | "TB";
+
/**
* Task collection serializer for responses.
*/
@@ -1425,14 +1416,14 @@ export type HistoricalMetricsData = {
export type HistoricalMetricsResponse = HistoricalMetricDataResponse;
-export type GraphDataData = {
+export type StructureDataData = {
dagId: string;
includeDownstream?: boolean;
includeUpstream?: boolean;
root?: string | null;
};
-export type GraphDataResponse2 = GraphDataResponse;
+export type StructureDataResponse2 = StructureDataResponse;
export type ListBackfillsData = {
dagId: string;
@@ -2456,14 +2447,14 @@ export type $OpenApiTs = {
};
};
};
- "/ui/graph/graph_data": {
+ "/ui/structure/structure_data": {
get: {
- req: GraphDataData;
+ req: StructureDataData;
res: {
/**
* Successful Response
*/
- 200: GraphDataResponse;
+ 200: StructureDataResponse;
/**
* Bad Request
*/
diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py
index 3597c7f893c..3d3738f5a8d 100644
--- a/airflow/utils/task_group.py
+++ b/airflow/utils/task_group.py
@@ -81,6 +81,60 @@ def task_group_to_dict(task_item_or_group):
"""Create a nested dict representation of this TaskGroup and its children
used to construct the Graph."""
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.mappedoperator import MappedOperator
+ from airflow.sensors.base import BaseSensorOperator
+
+ if isinstance(task := task_item_or_group, AbstractOperator):
+ setup_teardown_type = {}
+ is_mapped = {}
+ node_type = {"type": "task"}
+ if task.is_setup is True:
+ setup_teardown_type["setup_teardown_type"] = "setup"
+ elif task.is_teardown is True:
+ setup_teardown_type["setup_teardown_type"] = "teardown"
+ if isinstance(task, MappedOperator):
+ is_mapped["is_mapped"] = True
+ if isinstance(task, BaseSensorOperator):
+ node_type["type"] = "sensor"
+ return {
+ "id": task.task_id,
+ "label": task.label,
+ **is_mapped,
+ **setup_teardown_type,
+ **node_type,
+ }
+
+ task_group = task_item_or_group
+ is_mapped = isinstance(task_group, MappedTaskGroup)
+ children = [
+ task_group_to_dict(child) for child in
sorted(task_group.children.values(), key=lambda t: t.label)
+ ]
+
+ if task_group.upstream_group_ids or task_group.upstream_task_ids:
+ # This is the join node used to reduce the number of edges between two
TaskGroup.
+ children.append({"id": task_group.upstream_join_id, "label": "",
"type": "join"})
+
+ if task_group.downstream_group_ids or task_group.downstream_task_ids:
+ # This is the join node used to reduce the number of edges between two
TaskGroup.
+ children.append({"id": task_group.downstream_join_id, "label": "",
"type": "join"})
+
+ return {
+ "id": task_group.group_id,
+ "label": task_group.label,
+ "tooltip": task_group.tooltip,
+ "is_mapped": is_mapped,
+ "children": children,
+ "type": "task_group",
+ }
+
+
+def task_group_to_dict_legacy(task_item_or_group):
+ """
+ Legacy function to create a nested dict representation of this TaskGroup
and its children used to construct the Graph.
+
+ TODO: To remove for airflow 3 once the legacy UI is deleted.
+ """
+ from airflow.models.abstractoperator import AbstractOperator
+ from airflow.models.mappedoperator import MappedOperator
if isinstance(task := task_item_or_group, AbstractOperator):
setup_teardown_type = {}
@@ -106,7 +160,8 @@ def task_group_to_dict(task_item_or_group):
task_group = task_item_or_group
is_mapped = isinstance(task_group, MappedTaskGroup)
children = [
- task_group_to_dict(child) for child in
sorted(task_group.children.values(), key=lambda t: t.label)
+ task_group_to_dict_legacy(child)
+ for child in sorted(task_group.children.values(), key=lambda t:
t.label)
]
if task_group.upstream_group_ids or task_group.upstream_task_ids:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 57f7b3c204f..5fc189800f6 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -132,7 +132,7 @@ from airflow.utils.net import get_hostname
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.strings import to_boolean
-from airflow.utils.task_group import TaskGroup, task_group_to_dict
+from airflow.utils.task_group import TaskGroup, task_group_to_dict_legacy
from airflow.utils.timezone import td_format, utcnow
from airflow.utils.types import NOTSET, DagRunTriggeredByType
from airflow.version import version
@@ -3243,7 +3243,7 @@ class Airflow(AirflowBaseView):
task_ids_or_regex=root, include_upstream=filter_upstream,
include_downstream=filter_downstream
)
- nodes = task_group_to_dict(dag.task_group)
+ nodes = task_group_to_dict_legacy(dag.task_group)
edges = dag_edges(dag)
data = {
diff --git a/task_sdk/src/airflow/sdk/definitions/taskgroup.py
b/task_sdk/src/airflow/sdk/definitions/taskgroup.py
index 7395b341740..fd02a4c94e7 100644
--- a/task_sdk/src/airflow/sdk/definitions/taskgroup.py
+++ b/task_sdk/src/airflow/sdk/definitions/taskgroup.py
@@ -146,7 +146,10 @@ class TaskGroup(DAGNode):
if self.parent_group:
self.parent_group.add(self)
if self.parent_group.default_args:
- self.default_args = {**self.parent_group.default_args,
**self.default_args}
+ self.default_args = {
+ **self.parent_group.default_args,
+ **self.default_args,
+ }
if self._group_id:
self.used_group_ids.add(self.group_id)
@@ -235,7 +238,9 @@ class TaskGroup(DAGNode):
if self.dag:
if task.dag is not None and self.dag is not task.dag:
raise RuntimeError(
- "Cannot mix TaskGroups from different DAGs: %s and
%s", self.dag, task.dag
+ "Cannot mix TaskGroups from different DAGs: %s and %s",
+ self.dag,
+ task.dag,
)
task.dag = self.dag
if task.children:
@@ -268,7 +273,10 @@ class TaskGroup(DAGNode):
return self._group_id
def update_relative(
- self, other: DependencyMixin, upstream: bool = True, edge_modifier:
EdgeModifier | None = None
+ self,
+ other: DependencyMixin,
+ upstream: bool = True,
+ edge_modifier: EdgeModifier | None = None,
) -> None:
"""
Override TaskMixin.update_relative.
@@ -463,7 +471,10 @@ class TaskGroup(DAGNode):
from airflow.serialization.enums import DagAttributeTypes
from airflow.serialization.serialized_objects import
TaskGroupSerialization
- return DagAttributeTypes.TASK_GROUP,
TaskGroupSerialization.serialize_task_group(self)
+ return (
+ DagAttributeTypes.TASK_GROUP,
+ TaskGroupSerialization.serialize_task_group(self),
+ )
def hierarchical_alphabetical_sort(self):
"""
@@ -475,7 +486,8 @@ class TaskGroup(DAGNode):
:return: list of tasks in hierarchical alphabetical order
"""
return sorted(
- self.children.values(), key=lambda node: (not isinstance(node,
TaskGroup), node.node_id)
+ self.children.values(),
+ key=lambda node: (not isinstance(node, TaskGroup), node.node_id),
)
def topological_sort(self):
@@ -626,28 +638,28 @@ def task_group_to_dict(task_item_or_group):
"""Create a nested dict representation of this TaskGroup and its children
used to construct the Graph."""
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.mappedoperator import MappedOperator
+ from airflow.sensors.base import BaseSensorOperator
if isinstance(task := task_item_or_group, AbstractOperator):
setup_teardown_type = {}
is_mapped = {}
+ node_type = {"type": "task"}
if task.is_setup is True:
- setup_teardown_type["setupTeardownType"] = "setup"
+ setup_teardown_type["setup_teardown_type"] = "setup"
elif task.is_teardown is True:
- setup_teardown_type["setupTeardownType"] = "teardown"
+ setup_teardown_type["setup_teardown_type"] = "teardown"
if isinstance(task, MappedOperator):
- is_mapped["isMapped"] = True
+ is_mapped["is_mapped"] = True
+ if isinstance(task, BaseSensorOperator):
+ node_type["type"] = "sensor"
return {
"id": task.task_id,
- "value": {
- "label": task.label,
- "labelStyle": f"fill:{task.ui_fgcolor};",
- "style": f"fill:{task.ui_color};",
- "rx": 5,
- "ry": 5,
- **is_mapped,
- **setup_teardown_type,
- },
+ "label": task.label,
+ **is_mapped,
+ **setup_teardown_type,
+ **node_type,
}
+
task_group = task_item_or_group
is_mapped = isinstance(task_group, MappedTaskGroup)
children = [
@@ -655,43 +667,18 @@ def task_group_to_dict(task_item_or_group):
]
if task_group.upstream_group_ids or task_group.upstream_task_ids:
- children.append(
- {
- "id": task_group.upstream_join_id,
- "value": {
- "label": "",
- "labelStyle": f"fill:{task_group.ui_fgcolor};",
- "style": f"fill:{task_group.ui_color};",
- "shape": "circle",
- },
- }
- )
+ # This is the join node used to reduce the number of edges between two
TaskGroup.
+ children.append({"id": task_group.upstream_join_id, "label": "",
"type": "join"})
if task_group.downstream_group_ids or task_group.downstream_task_ids:
# This is the join node used to reduce the number of edges between two
TaskGroup.
- children.append(
- {
- "id": task_group.downstream_join_id,
- "value": {
- "label": "",
- "labelStyle": f"fill:{task_group.ui_fgcolor};",
- "style": f"fill:{task_group.ui_color};",
- "shape": "circle",
- },
- }
- )
+ children.append({"id": task_group.downstream_join_id, "label": "",
"type": "join"})
return {
"id": task_group.group_id,
- "value": {
- "label": task_group.label,
- "labelStyle": f"fill:{task_group.ui_fgcolor};",
- "style": f"fill:{task_group.ui_color}",
- "rx": 5,
- "ry": 5,
- "clusterLabelPos": "top",
- "tooltip": task_group.tooltip,
- "isMapped": is_mapped,
- },
+ "label": task_group.label,
+ "tooltip": task_group.tooltip,
+ "is_mapped": is_mapped,
"children": children,
+ "type": "task_group",
}
diff --git a/tests/api_fastapi/core_api/routes/ui/test_graph.py
b/tests/api_fastapi/core_api/routes/ui/test_graph.py
deleted file mode 100644
index 85dabd3ac6a..00000000000
--- a/tests/api_fastapi/core_api/routes/ui/test_graph.py
+++ /dev/null
@@ -1,198 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import pendulum
-import pytest
-
-from airflow.models import DagBag
-from airflow.operators.empty import EmptyOperator
-
-from tests_common.test_utils.db import clear_db_runs
-
-pytestmark = pytest.mark.db_test
-
-DAG_ID = "test_dag_id"
-
-
[email protected](autouse=True, scope="module")
-def examples_dag_bag():
- # Speed up: We don't want example dags for this module
-
- return DagBag(include_examples=False, read_dags_from_db=True)
-
-
[email protected](autouse=True)
-def clean():
- clear_db_runs()
- yield
- clear_db_runs()
-
-
[email protected]
-def make_dag(dag_maker, session, time_machine):
- with dag_maker(
- dag_id=DAG_ID,
- serialized=True,
- session=session,
- start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC),
- ):
- EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2")
-
- dag_maker.dagbag.sync_to_db()
-
-
-class TestGraphDataEndpoint:
- @pytest.mark.parametrize(
- "params, expected",
- [
- (
- {"dag_id": DAG_ID},
- {
- "arrange": "LR",
- "edges": [
- {
- "is_setup_teardown": None,
- "label": None,
- "source_id": "task_1",
- "target_id": "task_2",
- },
- ],
- "nodes": {
- "children": [
- {
- "children": None,
- "id": "task_1",
- "value": {
- "clusterLabelPos": None,
- "isMapped": None,
- "label": "task_1",
- "labelStyle": "fill:#000;",
- "rx": 5,
- "ry": 5,
- "setupTeardownType": None,
- "style": "fill:#e8f7e4;",
- "tooltip": None,
- },
- },
- {
- "children": None,
- "id": "task_2",
- "value": {
- "clusterLabelPos": None,
- "isMapped": None,
- "label": "task_2",
- "labelStyle": "fill:#000;",
- "rx": 5,
- "ry": 5,
- "setupTeardownType": None,
- "style": "fill:#e8f7e4;",
- "tooltip": None,
- },
- },
- ],
- "id": None,
- "value": {
- "clusterLabelPos": "top",
- "isMapped": False,
- "label": None,
- "labelStyle": "fill:#000;",
- "rx": 5,
- "ry": 5,
- "setupTeardownType": None,
- "style": "fill:CornflowerBlue",
- "tooltip": "",
- },
- },
- },
- ),
- (
- {
- "dag_id": DAG_ID,
- "root": "unknown_task",
- },
- {
- "arrange": "LR",
- "edges": [],
- "nodes": {
- "children": [],
- "id": None,
- "value": {
- "clusterLabelPos": "top",
- "isMapped": False,
- "label": None,
- "labelStyle": "fill:#000;",
- "rx": 5,
- "ry": 5,
- "setupTeardownType": None,
- "style": "fill:CornflowerBlue",
- "tooltip": "",
- },
- },
- },
- ),
- (
- {
- "dag_id": DAG_ID,
- "root": "task_1",
- "filter_upstream": False,
- "filter_downstream": False,
- },
- {
- "arrange": "LR",
- "edges": [],
- "nodes": {
- "children": [
- {
- "children": None,
- "id": "task_1",
- "value": {
- "clusterLabelPos": None,
- "isMapped": None,
- "label": "task_1",
- "labelStyle": "fill:#000;",
- "rx": 5,
- "ry": 5,
- "setupTeardownType": None,
- "style": "fill:#e8f7e4;",
- "tooltip": None,
- },
- },
- ],
- "id": None,
- "value": {
- "clusterLabelPos": "top",
- "isMapped": False,
- "label": None,
- "labelStyle": "fill:#000;",
- "rx": 5,
- "ry": 5,
- "setupTeardownType": None,
- "style": "fill:CornflowerBlue",
- "tooltip": "",
- },
- },
- },
- ),
- ],
- )
- @pytest.mark.usefixtures("make_dag")
- def test_historical_metrics_data(self, test_client, params, expected):
- response = test_client.get("/ui/graph/graph_data", params=params)
- assert response.status_code == 200
- assert response.json() == expected
diff --git a/tests/api_fastapi/core_api/routes/ui/test_structure.py
b/tests/api_fastapi/core_api/routes/ui/test_structure.py
new file mode 100644
index 00000000000..202f8b207a7
--- /dev/null
+++ b/tests/api_fastapi/core_api/routes/ui/test_structure.py
@@ -0,0 +1,134 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pendulum
+import pytest
+
+from airflow.models import DagBag
+from airflow.operators.empty import EmptyOperator
+
+from tests_common.test_utils.db import clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+DAG_ID = "test_dag_id"
+
+
[email protected](autouse=True, scope="module")
+def examples_dag_bag():
+ # Speed up: We don't want example dags for this module
+
+ return DagBag(include_examples=False, read_dags_from_db=True)
+
+
[email protected](autouse=True)
+def clean():
+ clear_db_runs()
+ yield
+ clear_db_runs()
+
+
[email protected]
+def make_dag(dag_maker, session, time_machine):
+ with dag_maker(
+ dag_id=DAG_ID,
+ serialized=True,
+ session=session,
+ start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC),
+ ):
+ EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2")
+
+ dag_maker.dagbag.sync_to_db()
+
+
+class TestStructureDataEndpoint:
+ @pytest.mark.parametrize(
+ "params, expected",
+ [
+ (
+ {"dag_id": DAG_ID},
+ {
+ "arrange": "LR",
+ "edges": [
+ {
+ "is_setup_teardown": None,
+ "label": None,
+ "source_id": "task_1",
+ "target_id": "task_2",
+ },
+ ],
+ "nodes": [
+ {
+ "children": None,
+ "id": "task_1",
+ "is_mapped": None,
+ "label": "task_1",
+ "setup_teardown_type": None,
+ "tooltip": None,
+ "type": "task",
+ },
+ {
+ "children": None,
+ "id": "task_2",
+ "is_mapped": None,
+ "label": "task_2",
+ "setup_teardown_type": None,
+ "tooltip": None,
+ "type": "task",
+ },
+ ],
+ },
+ ),
+ (
+ {
+ "dag_id": DAG_ID,
+ "root": "unknown_task",
+ },
+ {"arrange": "LR", "edges": [], "nodes": []},
+ ),
+ (
+ {
+ "dag_id": DAG_ID,
+ "root": "task_1",
+ "filter_upstream": False,
+ "filter_downstream": False,
+ },
+ {
+ "arrange": "LR",
+ "edges": [],
+ "nodes": [
+ {
+ "children": None,
+ "id": "task_1",
+ "is_mapped": None,
+ "label": "task_1",
+ "setup_teardown_type": None,
+ "tooltip": None,
+ "type": "task",
+ },
+ ],
+ },
+ ),
+ ],
+ )
+ @pytest.mark.usefixtures("make_dag")
+ def test_historical_metrics_data(self, test_client, params, expected):
+ response = test_client.get("/ui/structure/structure_data",
params=params)
+ assert response.status_code == 200
+ assert response.json() == expected
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index 00caac703a4..7b5c960f9c2 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -35,7 +35,7 @@ from airflow.models.dag import DAG
from airflow.models.xcom_arg import XComArg
from airflow.operators.empty import EmptyOperator
from airflow.utils.dag_edges import dag_edges
-from airflow.utils.task_group import TaskGroup, task_group_to_dict
+from airflow.utils.task_group import TaskGroup, task_group_to_dict,
task_group_to_dict_legacy
from tests.models import DEFAULT_DATE
from tests_common.test_utils.compat import BashOperator, PythonOperator
@@ -54,7 +54,7 @@ def make_task(name, type_="classic"):
return my_task.override(task_id=name)()
-EXPECTED_JSON = {
+EXPECTED_JSON_LEGACY = {
"id": None,
"value": {
"label": None,
@@ -168,6 +168,41 @@ EXPECTED_JSON = {
],
}
+EXPECTED_JSON = {
+ "id": None,
+ "label": None,
+ "tooltip": "",
+ "is_mapped": False,
+ "children": [
+ {
+ "id": "group234",
+ "label": "group234",
+ "tooltip": "",
+ "is_mapped": False,
+ "children": [
+ {
+ "id": "group234.group34",
+ "label": "group34",
+ "tooltip": "",
+ "is_mapped": False,
+ "children": [
+ {"id": "group234.group34.task3", "label": "task3",
"type": "task"},
+ {"id": "group234.group34.task4", "label": "task4",
"type": "task"},
+ {"id": "group234.group34.downstream_join_id", "label":
"", "type": "join"},
+ ],
+ "type": "task_group",
+ },
+ {"id": "group234.task2", "label": "task2", "type": "task"},
+ {"id": "group234.upstream_join_id", "label": "", "type":
"join"},
+ ],
+ "type": "task_group",
+ },
+ {"id": "task1", "label": "task1", "type": "task"},
+ {"id": "task5", "label": "task5", "type": "task"},
+ ],
+ "type": "task_group",
+}
+
def test_build_task_group_context_manager():
logical_date = pendulum.parse("20200101")
@@ -199,6 +234,7 @@ def test_build_task_group_context_manager():
assert set(dag.task_group.children.keys()) == {"task1", "group234",
"task5"}
assert group34.group_id == "group234.group34"
+ assert task_group_to_dict_legacy(dag.task_group) == EXPECTED_JSON_LEGACY
assert task_group_to_dict(dag.task_group) == EXPECTED_JSON
@@ -220,17 +256,21 @@ def test_build_task_group():
task1 >> group234
group34 >> task5
+ assert task_group_to_dict_legacy(dag.task_group) == EXPECTED_JSON_LEGACY
assert task_group_to_dict(dag.task_group) == EXPECTED_JSON
-def extract_node_id(node, include_label=False):
+def extract_node_id(node, include_label=False, from_legacy=False):
ret = {"id": node["id"]}
if include_label:
- ret["label"] = node["value"]["label"]
+ if from_legacy:
+ ret["label"] = node["value"]["label"]
+ else:
+ ret["label"] = node["label"]
if "children" in node:
children = []
for child in node["children"]:
- children.append(extract_node_id(child,
include_label=include_label))
+ children.append(extract_node_id(child,
include_label=include_label, from_legacy=from_legacy))
ret["children"] = children
@@ -267,7 +307,7 @@ def test_build_task_group_with_prefix():
assert group234.get_child_by_label("group34") == group34
assert group4.get_child_by_label("task4") == task4
- assert extract_node_id(task_group_to_dict(dag.task_group),
include_label=True) == {
+ expected_node_id = {
"id": None,
"label": None,
"children": [
@@ -297,6 +337,12 @@ def test_build_task_group_with_prefix():
],
}
+ assert (
+ extract_node_id(task_group_to_dict_legacy(dag.task_group),
include_label=True, from_legacy=True)
+ == expected_node_id
+ )
+ assert extract_node_id(task_group_to_dict(dag.task_group),
include_label=True) == expected_node_id
+
def test_build_task_group_with_task_decorator():
"""
@@ -341,7 +387,7 @@ def test_build_task_group_with_task_decorator():
assert tsk_1.operator in tsk_3.operator.upstream_list
assert tsk_5.operator in tsk_4.operator.downstream_list
- assert extract_node_id(task_group_to_dict(dag.task_group)) == {
+ expected_node_id = {
"id": None,
"children": [
{
@@ -359,6 +405,9 @@ def test_build_task_group_with_task_decorator():
],
}
+ assert extract_node_id(task_group_to_dict_legacy(dag.task_group),
from_legacy=True) == expected_node_id
+ assert extract_node_id(task_group_to_dict(dag.task_group)) ==
expected_node_id
+
edges = dag_edges(dag)
assert sorted((e["source_id"], e["target_id"]) for e in edges) == [
("group234.downstream_join_id", "task_5"),
@@ -398,7 +447,7 @@ def test_sub_dag_task_group():
subdag = dag.partial_subset(task_ids_or_regex="task5",
include_upstream=True, include_downstream=False)
- assert extract_node_id(task_group_to_dict(subdag.task_group)) == {
+ expected_node_id = {
"id": None,
"children": [
{
@@ -420,6 +469,9 @@ def test_sub_dag_task_group():
],
}
+ assert extract_node_id(task_group_to_dict_legacy(subdag.task_group),
from_legacy=True) == expected_node_id
+ assert extract_node_id(task_group_to_dict(subdag.task_group)) ==
expected_node_id
+
edges = dag_edges(subdag)
assert sorted((e["source_id"], e["target_id"]) for e in edges) == [
("group234.group34.downstream_join_id", "task5"),
@@ -485,10 +537,11 @@ def test_dag_edges():
group_d << group_c
- nodes = task_group_to_dict(dag.task_group)
+ nodes_legacy = task_group_to_dict_legacy(dag.task_group)
+ nodes = task_group_to_dict_legacy(dag.task_group)
edges = dag_edges(dag)
- assert extract_node_id(nodes) == {
+ expected_node_id = {
"id": None,
"children": [
{
@@ -532,6 +585,9 @@ def test_dag_edges():
],
}
+ assert extract_node_id(nodes_legacy) == expected_node_id
+ assert extract_node_id(nodes, from_legacy=False) == expected_node_id
+
assert sorted((e["source_id"], e["target_id"]) for e in edges) == [
("group_a.downstream_join_id", "group_c.upstream_join_id"),
("group_a.group_b.downstream_join_id", "group_a.task5"),
@@ -787,6 +843,7 @@ def test_build_task_group_deco_context_manager():
],
}
+ assert extract_node_id(task_group_to_dict_legacy(dag.task_group),
from_legacy=True) == node_ids
assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
@@ -968,6 +1025,7 @@ def test_task_group_context_mix():
],
}
+ assert extract_node_id(task_group_to_dict_legacy(dag.task_group),
from_legacy=True) == node_ids
assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
@@ -1062,6 +1120,7 @@ def test_duplicate_task_group_id():
],
}
+ assert extract_node_id(task_group_to_dict_legacy(dag.task_group),
from_legacy=True) == node_ids
assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
@@ -1124,6 +1183,7 @@ def test_call_taskgroup_twice():
],
}
+ assert extract_node_id(task_group_to_dict_legacy(dag.task_group),
from_legacy=True) == node_ids
assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
@@ -1593,13 +1653,25 @@ def test_task_group_arrow_with_setup_group():
assert set(t1.operator.downstream_task_ids) == set()
assert set(t2.operator.downstream_task_ids) == set()
- def get_nodes(group):
- d = task_group_to_dict(group)
+ def get_nodes(group, from_legacy=False):
+ if from_legacy:
+ d = task_group_to_dict_legacy(group)
+ else:
+ d = task_group_to_dict(group)
new_d = {}
new_d["id"] = d["id"]
new_d["children"] = [{"id": x["id"]} for x in d["children"]]
return new_d
+ assert get_nodes(g1, from_legacy=True) == {
+ "id": "group_1",
+ "children": [
+ {"id": "group_1.setup_1"},
+ {"id": "group_1.setup_2"},
+ {"id": "group_1.downstream_join_id"},
+ ],
+ }
+
assert get_nodes(g1) == {
"id": "group_1",
"children": [