This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c1f70a3b588 Display all task tries in Gantt chart (#61058)
c1f70a3b588 is described below
commit c1f70a3b588ca641a3141b56dc587334ad5248c5
Author: Guan-Ming (Wesley) Chiu <[email protected]>
AuthorDate: Sat Jan 31 05:31:40 2026 +0800
Display all task tries in Gantt chart (#61058)
* Display all task tries in Gantt chart
* Add try_number on handleBarClick
* Refactor Gantt chart data transformation
---
.../api_fastapi/core_api/datamodels/ui/gantt.py | 43 +++
.../api_fastapi/core_api/openapi/_private_ui.yaml | 103 +++++++
.../api_fastapi/core_api/routes/ui/__init__.py | 2 +
.../api_fastapi/core_api/routes/ui/gantt.py | 112 ++++++++
.../src/airflow/ui/openapi-gen/queries/common.ts | 9 +-
.../ui/openapi-gen/queries/ensureQueryData.ts | 15 +-
.../src/airflow/ui/openapi-gen/queries/prefetch.ts | 15 +-
.../src/airflow/ui/openapi-gen/queries/queries.ts | 15 +-
.../src/airflow/ui/openapi-gen/queries/suspense.ts | 15 +-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 85 ++++++
.../ui/openapi-gen/requests/services.gen.ts | 29 +-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 48 ++++
.../airflow/ui/src/layouts/Details/Gantt/Gantt.tsx | 67 +----
.../airflow/ui/src/layouts/Details/Gantt/utils.ts | 170 ++++++++---
.../src/airflow/ui/src/queries/useClearRun.ts | 2 +
.../ui/src/queries/useClearTaskInstances.ts | 2 +
.../api_fastapi/core_api/routes/ui/test_gantt.py | 314 +++++++++++++++++++++
17 files changed, 951 insertions(+), 95 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/gantt.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/gantt.py
new file mode 100644
index 00000000000..7634854e59b
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/gantt.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.api_fastapi.core_api.base import BaseModel
+from airflow.utils.state import TaskInstanceState
+
+
+class GanttTaskInstance(BaseModel):
+ """Task instance data for Gantt chart."""
+
+ task_id: str
+ try_number: int
+ state: TaskInstanceState | None
+ start_date: datetime | None
+ end_date: datetime | None
+ is_group: bool = False
+ is_mapped: bool = False
+
+
+class GanttResponse(BaseModel):
+ """Response for Gantt chart endpoint."""
+
+ dag_id: str
+ run_id: str
+ task_instances: list[GanttTaskInstance]
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index 7d11ce58776..b5b68d79fec 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -1010,6 +1010,48 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /ui/gantt/{dag_id}/{run_id}:
+ get:
+ tags:
+ - Gantt
+ summary: Get Gantt Data
+ description: Get all task instance tries for Gantt chart.
+ operationId: get_gantt_data
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Run Id
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/GanttResponse'
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/ui/calendar/{dag_id}:
get:
tags:
@@ -1907,6 +1949,67 @@ components:
- text
- href
title: ExtraMenuItem
+ GanttResponse:
+ properties:
+ dag_id:
+ type: string
+ title: Dag Id
+ run_id:
+ type: string
+ title: Run Id
+ task_instances:
+ items:
+ $ref: '#/components/schemas/GanttTaskInstance'
+ type: array
+ title: Task Instances
+ type: object
+ required:
+ - dag_id
+ - run_id
+ - task_instances
+ title: GanttResponse
+ description: Response for Gantt chart endpoint.
+ GanttTaskInstance:
+ properties:
+ task_id:
+ type: string
+ title: Task Id
+ try_number:
+ type: integer
+ title: Try Number
+ state:
+ anyOf:
+ - $ref: '#/components/schemas/TaskInstanceState'
+ - type: 'null'
+ start_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Start Date
+ end_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: End Date
+ is_group:
+ type: boolean
+ title: Is Group
+ default: false
+ is_mapped:
+ type: boolean
+ title: Is Mapped
+ default: false
+ type: object
+ required:
+ - task_id
+ - try_number
+ - state
+ - start_date
+ - end_date
+ title: GanttTaskInstance
+ description: Task instance data for Gantt chart.
GridNodeResponse:
properties:
id:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/__init__.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/__init__.py
index 71cf161028a..3a3cadc46f9 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/__init__.py
@@ -26,6 +26,7 @@ from airflow.api_fastapi.core_api.routes.ui.connections
import connections_route
from airflow.api_fastapi.core_api.routes.ui.dags import dags_router
from airflow.api_fastapi.core_api.routes.ui.dashboard import dashboard_router
from airflow.api_fastapi.core_api.routes.ui.dependencies import
dependencies_router
+from airflow.api_fastapi.core_api.routes.ui.gantt import gantt_router
from airflow.api_fastapi.core_api.routes.ui.grid import grid_router
from airflow.api_fastapi.core_api.routes.ui.structure import structure_router
from airflow.api_fastapi.core_api.routes.ui.teams import teams_router
@@ -42,5 +43,6 @@ ui_router.include_router(dashboard_router)
ui_router.include_router(structure_router)
ui_router.include_router(backfills_router)
ui_router.include_router(grid_router)
+ui_router.include_router(gantt_router)
ui_router.include_router(calendar_router)
ui_router.include_router(teams_router)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/gantt.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/gantt.py
new file mode 100644
index 00000000000..a85ee89fe2f
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/gantt.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from fastapi import Depends, HTTPException, status
+from sqlalchemy import or_, select, union_all
+
+from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
+from airflow.api_fastapi.common.db.common import SessionDep
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.gantt import GanttResponse,
GanttTaskInstance
+from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.security import requires_access_dag
+from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
+from airflow.utils.state import TaskInstanceState
+
+gantt_router = AirflowRouter(prefix="/gantt", tags=["Gantt"])
+
+
+@gantt_router.get(
+ "/{dag_id}/{run_id}",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+ dependencies=[
+ Depends(
+ requires_access_dag(
+ method="GET",
+ access_entity=DagAccessEntity.TASK_INSTANCE,
+ )
+ ),
+ Depends(
+ requires_access_dag(
+ method="GET",
+ access_entity=DagAccessEntity.RUN,
+ )
+ ),
+ ],
+)
+def get_gantt_data(
+ dag_id: str,
+ run_id: str,
+ session: SessionDep,
+) -> GanttResponse:
+ """Get all task instance tries for Gantt chart."""
+ # Exclude mapped tasks (use grid summaries) and UP_FOR_RETRY (already in
history)
+ current_tis = select(
+ TaskInstance.task_id.label("task_id"),
+ TaskInstance.try_number.label("try_number"),
+ TaskInstance.state.label("state"),
+ TaskInstance.start_date.label("start_date"),
+ TaskInstance.end_date.label("end_date"),
+ ).where(
+ TaskInstance.dag_id == dag_id,
+ TaskInstance.run_id == run_id,
+ TaskInstance.map_index == -1,
+ or_(TaskInstance.state != TaskInstanceState.UP_FOR_RETRY,
TaskInstance.state.is_(None)),
+ )
+
+ history_tis = select(
+ TaskInstanceHistory.task_id.label("task_id"),
+ TaskInstanceHistory.try_number.label("try_number"),
+ TaskInstanceHistory.state.label("state"),
+ TaskInstanceHistory.start_date.label("start_date"),
+ TaskInstanceHistory.end_date.label("end_date"),
+ ).where(
+ TaskInstanceHistory.dag_id == dag_id,
+ TaskInstanceHistory.run_id == run_id,
+ TaskInstanceHistory.map_index == -1,
+ )
+
+ combined = union_all(current_tis, history_tis).subquery()
+ query = select(combined).order_by(combined.c.task_id,
combined.c.try_number)
+
+ results = session.execute(query).fetchall()
+
+ if not results:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ f"No task instances for dag_id={dag_id} run_id={run_id}",
+ )
+
+ task_instances = [
+ GanttTaskInstance(
+ task_id=row.task_id,
+ try_number=row.try_number,
+ state=row.state,
+ start_date=row.start_date,
+ end_date=row.end_date,
+ )
+ for row in results
+ ]
+
+ return GanttResponse(dag_id=dag_id, run_id=run_id,
task_instances=task_instances)
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index 4157583cd3f..f6a3cccdf94 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -1,7 +1,7 @@
// generated with @7nohe/[email protected]
import { UseQueryResult } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService,
DagSourceService, DagStatsService, DagVersionService, DagWarningService,
DashboardService, DependenciesService, EventLogService, ExperimentalService,
ExtraLinksService, GridService, ImportErrorService, JobService, LoginService,
MonitorService, PluginService, PoolService, ProviderService, StructureService,
TaskInstanceService, TaskServi [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService,
DagSourceService, DagStatsService, DagVersionService, DagWarningService,
DashboardService, DependenciesService, EventLogService, ExperimentalService,
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService,
LoginService, MonitorService, PluginService, PoolService, ProviderService,
StructureService, TaskInstanceServ [...]
import { DagRunState, DagWarningType } from "../requests/types.gen";
export type AssetServiceGetAssetsDefaultResponse = Awaited<ReturnType<typeof
AssetService.getAssets>>;
export type AssetServiceGetAssetsQueryResult<TData =
AssetServiceGetAssetsDefaultResponse, TError = unknown> = UseQueryResult<TData,
TError>;
@@ -860,6 +860,13 @@ export const UseGridServiceGetGridTiSummariesKeyFn = ({
dagId, runId }: {
dagId: string;
runId: string;
}, queryKey?: Array<unknown>) => [useGridServiceGetGridTiSummariesKey,
...(queryKey ?? [{ dagId, runId }])];
+export type GanttServiceGetGanttDataDefaultResponse =
Awaited<ReturnType<typeof GanttService.getGanttData>>;
+export type GanttServiceGetGanttDataQueryResult<TData =
GanttServiceGetGanttDataDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
+export const useGanttServiceGetGanttDataKey = "GanttServiceGetGanttData";
+export const UseGanttServiceGetGanttDataKeyFn = ({ dagId, runId }: {
+ dagId: string;
+ runId: string;
+}, queryKey?: Array<unknown>) => [useGanttServiceGetGanttDataKey, ...(queryKey
?? [{ dagId, runId }])];
export type CalendarServiceGetCalendarDefaultResponse =
Awaited<ReturnType<typeof CalendarService.getCalendar>>;
export type CalendarServiceGetCalendarQueryResult<TData =
CalendarServiceGetCalendarDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
export const useCalendarServiceGetCalendarKey = "CalendarServiceGetCalendar";
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
index 8851b0be65d..1c42363cf41 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
@@ -1,7 +1,7 @@
// generated with @7nohe/[email protected]
import { type QueryClient } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DependenciesService, EventLogService, ExperimentalService, ExtraLinksService,
GridService, ImportErrorService, JobService, LoginService, MonitorService,
PluginService, PoolService, ProviderService, StructureService,
TaskInstanceService, TaskService, TeamsService, V [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DependenciesService, EventLogService, ExperimentalService, ExtraLinksService,
GanttService, GridService, ImportErrorService, JobService, LoginService,
MonitorService, PluginService, PoolService, ProviderService, StructureService,
TaskInstanceService, TaskService, T [...]
import { DagRunState, DagWarningType } from "../requests/types.gen";
import * as Common from "./common";
/**
@@ -1645,6 +1645,19 @@ export const ensureUseGridServiceGetGridTiSummariesData
= (queryClient: QueryCli
runId: string;
}) => queryClient.ensureQueryData({ queryKey:
Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }), queryFn: () =>
GridService.getGridTiSummaries({ dagId, runId }) });
/**
+* Get Gantt Data
+* Get all task instance tries for Gantt chart.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.runId
+* @returns GanttResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUseGanttServiceGetGanttDataData = (queryClient:
QueryClient, { dagId, runId }: {
+ dagId: string;
+ runId: string;
+}) => queryClient.ensureQueryData({ queryKey:
Common.UseGanttServiceGetGanttDataKeyFn({ dagId, runId }), queryFn: () =>
GanttService.getGanttData({ dagId, runId }) });
+/**
* Get Calendar
* Get calendar data for a DAG including historical and planned DAG runs.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
index e940b64f20d..2379f68b668 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1,7 +1,7 @@
// generated with @7nohe/[email protected]
import { type QueryClient } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DependenciesService, EventLogService, ExperimentalService, ExtraLinksService,
GridService, ImportErrorService, JobService, LoginService, MonitorService,
PluginService, PoolService, ProviderService, StructureService,
TaskInstanceService, TaskService, TeamsService, V [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DependenciesService, EventLogService, ExperimentalService, ExtraLinksService,
GanttService, GridService, ImportErrorService, JobService, LoginService,
MonitorService, PluginService, PoolService, ProviderService, StructureService,
TaskInstanceService, TaskService, T [...]
import { DagRunState, DagWarningType } from "../requests/types.gen";
import * as Common from "./common";
/**
@@ -1645,6 +1645,19 @@ export const prefetchUseGridServiceGetGridTiSummaries =
(queryClient: QueryClien
runId: string;
}) => queryClient.prefetchQuery({ queryKey:
Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }), queryFn: () =>
GridService.getGridTiSummaries({ dagId, runId }) });
/**
+* Get Gantt Data
+* Get all task instance tries for Gantt chart.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.runId
+* @returns GanttResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUseGanttServiceGetGanttData = (queryClient: QueryClient,
{ dagId, runId }: {
+ dagId: string;
+ runId: string;
+}) => queryClient.prefetchQuery({ queryKey:
Common.UseGanttServiceGetGanttDataKeyFn({ dagId, runId }), queryFn: () =>
GanttService.getGanttData({ dagId, runId }) });
+/**
* Get Calendar
* Get calendar data for a DAG including historical and planned DAG runs.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 5b56fc439c9..f9aa1e60324 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -1,7 +1,7 @@
// generated with @7nohe/[email protected]
import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from
"@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService,
DagSourceService, DagStatsService, DagVersionService, DagWarningService,
DashboardService, DependenciesService, EventLogService, ExperimentalService,
ExtraLinksService, GridService, ImportErrorService, JobService, LoginService,
MonitorService, PluginService, PoolService, ProviderService, StructureService,
TaskInstanceService, TaskServi [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService,
DagSourceService, DagStatsService, DagVersionService, DagWarningService,
DashboardService, DependenciesService, EventLogService, ExperimentalService,
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService,
LoginService, MonitorService, PluginService, PoolService, ProviderService,
StructureService, TaskInstanceServ [...]
import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_,
BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_,
ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody,
DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState,
DagWarningType, PatchTaskInstanceBody, PoolBody, PoolPatchBody,
TaskInstancesBatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload,
VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen";
import * as Common from "./common";
/**
@@ -1645,6 +1645,19 @@ export const useGridServiceGetGridTiSummaries = <TData =
Common.GridServiceGetGr
runId: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }, queryKey),
queryFn: () => GridService.getGridTiSummaries({ dagId, runId }) as TData,
...options });
/**
+* Get Gantt Data
+* Get all task instance tries for Gantt chart.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.runId
+* @returns GanttResponse Successful Response
+* @throws ApiError
+*/
+export const useGanttServiceGetGanttData = <TData =
Common.GanttServiceGetGanttDataDefaultResponse, TError = unknown, TQueryKey
extends Array<unknown> = unknown[]>({ dagId, runId }: {
+ dagId: string;
+ runId: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseGanttServiceGetGanttDataKeyFn({ dagId, runId }, queryKey), queryFn:
() => GanttService.getGanttData({ dagId, runId }) as TData, ...options });
+/**
* Get Calendar
* Get calendar data for a DAG including historical and planned DAG runs.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
index d77bb789863..3255c6f4d21 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
@@ -1,7 +1,7 @@
// generated with @7nohe/[email protected]
import { UseQueryOptions, useSuspenseQuery } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DependenciesService, EventLogService, ExperimentalService, ExtraLinksService,
GridService, ImportErrorService, JobService, LoginService, MonitorService,
PluginService, PoolService, ProviderService, StructureService,
TaskInstanceService, TaskService, TeamsService, V [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DependenciesService, EventLogService, ExperimentalService, ExtraLinksService,
GanttService, GridService, ImportErrorService, JobService, LoginService,
MonitorService, PluginService, PoolService, ProviderService, StructureService,
TaskInstanceService, TaskService, T [...]
import { DagRunState, DagWarningType } from "../requests/types.gen";
import * as Common from "./common";
/**
@@ -1645,6 +1645,19 @@ export const useGridServiceGetGridTiSummariesSuspense =
<TData = Common.GridServ
runId: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }, queryKey),
queryFn: () => GridService.getGridTiSummaries({ dagId, runId }) as TData,
...options });
/**
+* Get Gantt Data
+* Get all task instance tries for Gantt chart.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.runId
+* @returns GanttResponse Successful Response
+* @throws ApiError
+*/
+export const useGanttServiceGetGanttDataSuspense = <TData =
Common.GanttServiceGetGanttDataDefaultResponse, TError = unknown, TQueryKey
extends Array<unknown> = unknown[]>({ dagId, runId }: {
+ dagId: string;
+ runId: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseGanttServiceGetGanttDataKeyFn({ dagId, runId }, queryKey), queryFn:
() => GanttService.getGanttData({ dagId, runId }) as TData, ...options });
+/**
* Get Calendar
* Get calendar data for a DAG including historical and planned DAG runs.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 6012fae8c8c..172a89744e6 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -7807,6 +7807,91 @@ export const $ExtraMenuItem = {
title: 'ExtraMenuItem'
} as const;
+export const $GanttResponse = {
+ properties: {
+ dag_id: {
+ type: 'string',
+ title: 'Dag Id'
+ },
+ run_id: {
+ type: 'string',
+ title: 'Run Id'
+ },
+ task_instances: {
+ items: {
+ '$ref': '#/components/schemas/GanttTaskInstance'
+ },
+ type: 'array',
+ title: 'Task Instances'
+ }
+ },
+ type: 'object',
+ required: ['dag_id', 'run_id', 'task_instances'],
+ title: 'GanttResponse',
+ description: 'Response for Gantt chart endpoint.'
+} as const;
+
+export const $GanttTaskInstance = {
+ properties: {
+ task_id: {
+ type: 'string',
+ title: 'Task Id'
+ },
+ try_number: {
+ type: 'integer',
+ title: 'Try Number'
+ },
+ state: {
+ anyOf: [
+ {
+ '$ref': '#/components/schemas/TaskInstanceState'
+ },
+ {
+ type: 'null'
+ }
+ ]
+ },
+ start_date: {
+ anyOf: [
+ {
+ type: 'string',
+ format: 'date-time'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Start Date'
+ },
+ end_date: {
+ anyOf: [
+ {
+ type: 'string',
+ format: 'date-time'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'End Date'
+ },
+ is_group: {
+ type: 'boolean',
+ title: 'Is Group',
+ default: false
+ },
+ is_mapped: {
+ type: 'boolean',
+ title: 'Is Mapped',
+ default: false
+ }
+ },
+ type: 'object',
+ required: ['task_id', 'try_number', 'state', 'start_date', 'end_date'],
+ title: 'GanttTaskInstance',
+ description: 'Task instance data for Gantt chart.'
+} as const;
+
export const $GridNodeResponse = {
properties: {
id: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index 5754bcebd16..81ac234c3d5 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -3,7 +3,7 @@
import type { CancelablePromise } from './core/CancelablePromise';
import { OpenAPI } from './core/OpenAPI';
import { request as __request } from './core/request';
-import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData,
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse,
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData,
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse,
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData,
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse,
Dele [...]
+import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData,
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse,
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData,
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse,
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData,
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse,
Dele [...]
export class AssetService {
/**
@@ -4085,6 +4085,33 @@ export class GridService {
}
+export class GanttService {
+ /**
+ * Get Gantt Data
+ * Get all task instance tries for Gantt chart.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.runId
+ * @returns GanttResponse Successful Response
+ * @throws ApiError
+ */
+ public static getGanttData(data: GetGanttDataData):
CancelablePromise<GetGanttDataResponse> {
+ return __request(OpenAPI, {
+ method: 'GET',
+ url: '/ui/gantt/{dag_id}/{run_id}',
+ path: {
+ dag_id: data.dagId,
+ run_id: data.runId
+ },
+ errors: {
+ 404: 'Not Found',
+ 422: 'Validation Error'
+ }
+ });
+ }
+
+}
+
export class CalendarService {
/**
* Get Calendar
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 90e75ac0e76..b27d2e1bb1e 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1924,6 +1924,28 @@ export type ExtraMenuItem = {
href: string;
};
+/**
+ * Response for Gantt chart endpoint.
+ */
+export type GanttResponse = {
+ dag_id: string;
+ run_id: string;
+ task_instances: Array<GanttTaskInstance>;
+};
+
+/**
+ * Task instance data for Gantt chart.
+ */
+export type GanttTaskInstance = {
+ task_id: string;
+ try_number: number;
+ state: TaskInstanceState | null;
+ start_date: string | null;
+ end_date: string | null;
+ is_group?: boolean;
+ is_mapped?: boolean;
+};
+
/**
* Base Node serializer for responses.
*/
@@ -3484,6 +3506,13 @@ export type GetGridTiSummariesData = {
export type GetGridTiSummariesResponse = GridTISummaries;
+export type GetGanttDataData = {
+ dagId: string;
+ runId: string;
+};
+
+export type GetGanttDataResponse = GanttResponse;
+
export type GetCalendarData = {
dagId: string;
granularity?: 'hourly' | 'daily';
@@ -6704,6 +6733,25 @@ export type $OpenApiTs = {
};
};
};
+ '/ui/gantt/{dag_id}/{run_id}': {
+ get: {
+ req: GetGanttDataData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: GanttResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
'/ui/calendar/{dag_id}': {
get: {
req: GetCalendarData;
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx
index 582353c12e1..6c546c58699 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx
@@ -33,13 +33,12 @@ import {
import "chart.js/auto";
import "chartjs-adapter-dayjs-4/dist/chartjs-adapter-dayjs-4.esm";
import annotationPlugin from "chartjs-plugin-annotation";
-import dayjs from "dayjs";
import { useDeferredValue } from "react";
import { Bar } from "react-chartjs-2";
import { useTranslation } from "react-i18next";
import { useParams, useNavigate, useLocation, useSearchParams } from
"react-router-dom";
-import { useTaskInstanceServiceGetTaskInstances } from "openapi/queries";
+import { useGanttServiceGetGanttData } from "openapi/queries";
import type { DagRunState, DagRunType } from "openapi/requests/types.gen";
import { useColorMode } from "src/context/colorMode";
import { useHover } from "src/context/hover";
@@ -53,7 +52,7 @@ import { useGridTiSummaries } from
"src/queries/useGridTISummaries";
import { getComputedCSSVariableValue } from "src/theme";
import { isStatePending, useAutoRefresh } from "src/utils";
-import { createHandleBarClick, createHandleBarHover, createChartOptions } from
"./utils";
+import { createHandleBarClick, createHandleBarHover, createChartOptions,
transformGanttData } from "./utils";
ChartJS.register(
CategoryScale,
@@ -131,7 +130,7 @@ export const Gantt = ({ dagRunState, limit, runType,
triggeringUser }: Props) =>
const selectedRun = gridRuns?.find((run) => run.run_id === runId);
const refetchInterval = useAutoRefresh({ dagId });
- // Get grid summaries for groups (which have min/max times)
+ // Get grid summaries for groups and mapped tasks (which have min/max times)
const { data: gridTiSummaries, isLoading: summariesLoading } =
useGridTiSummaries({
dagId,
enabled: Boolean(selectedRun),
@@ -139,13 +138,9 @@ export const Gantt = ({ dagRunState, limit, runType,
triggeringUser }: Props) =>
state: selectedRun?.state,
});
- // Get non mapped task instances for tasks (which have start/end times)
- const { data: taskInstancesData, isLoading: tiLoading } =
useTaskInstanceServiceGetTaskInstances(
- {
- dagId,
- dagRunId: runId,
- mapIndex: [-1],
- },
+ // Single fetch for all Gantt data (individual task tries)
+ const { data: ganttData, isLoading: ganttLoading } =
useGanttServiceGetGanttData(
+ { dagId, runId },
undefined,
{
enabled: Boolean(dagId) && Boolean(runId) && Boolean(selectedRun),
@@ -156,53 +151,14 @@ export const Gantt = ({ dagRunState, limit, runType,
triggeringUser }: Props) =>
const { flatNodes } = flattenNodes(dagStructure, deferredOpenGroupIds);
- const isLoading = runsLoading || structureLoading || summariesLoading ||
tiLoading;
+ const isLoading = runsLoading || structureLoading || summariesLoading ||
ganttLoading;
+ const allTries = ganttData?.task_instances ?? [];
const gridSummaries = gridTiSummaries?.task_instances ?? [];
- const taskInstances = taskInstancesData?.task_instances ?? [];
-
- const data =
- isLoading || runId === ""
- ? []
- : flatNodes
- .map((node) => {
- const gridSummary = gridSummaries.find((ti) => ti.task_id ===
node.id);
-
- if ((node.isGroup ?? node.is_mapped) && gridSummary) {
- // Use min/max times from grid summary; ISO so time scale and
bar positions render consistently across browsers
- return {
- isGroup: node.isGroup,
- isMapped: node.is_mapped,
- state: gridSummary.state,
- taskId: gridSummary.task_id,
- x: [
- dayjs(gridSummary.min_start_date).toISOString(),
- dayjs(gridSummary.max_end_date).toISOString(),
- ],
- y: gridSummary.task_id,
- };
- } else if (!node.isGroup) {
- // Individual task - use individual task instance data
- const taskInstance = taskInstances.find((ti) => ti.task_id ===
node.id);
-
- if (taskInstance) {
- const hasTaskRunning = isStatePending(taskInstance.state);
- const endTime = hasTaskRunning ? dayjs().toISOString() :
taskInstance.end_date;
- return {
- isGroup: node.isGroup,
- isMapped: node.is_mapped,
- state: taskInstance.state,
- taskId: taskInstance.task_id,
- x: [dayjs(taskInstance.start_date).toISOString(),
dayjs(endTime).toISOString()],
- y: taskInstance.task_id,
- };
- }
- }
+ const data = isLoading || runId === "" ? [] : transformGanttData({ allTries,
flatNodes, gridSummaries });
- return undefined;
- })
- .filter((item) => item !== undefined);
+ const labels = flatNodes.map((node) => node.id);
// Get all unique states and their colors
const states = [...new Set(data.map((item) => item.state ?? "none"))];
@@ -226,7 +182,7 @@ export const Gantt = ({ dagRunState, limit, runType,
triggeringUser }: Props) =>
minBarLength: MIN_BAR_WIDTH,
},
],
- labels: flatNodes.map((node) => node.id),
+ labels,
};
const fixedHeight = flatNodes.length * CHART_ROW_HEIGHT + CHART_PADDING;
@@ -243,6 +199,7 @@ export const Gantt = ({ dagRunState, limit, runType,
triggeringUser }: Props) =>
handleBarHover,
hoveredId: hoveredTaskId,
hoveredItemColor,
+ labels,
selectedId,
selectedItemColor,
selectedRun,
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/utils.ts
b/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/utils.ts
index 9741654e8d0..4645f61732a 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/utils.ts
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/utils.ts
@@ -1,3 +1,5 @@
+/* eslint-disable max-lines */
+
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -21,7 +23,14 @@ import dayjs from "dayjs";
import type { TFunction } from "i18next";
import type { NavigateFunction, Location } from "react-router-dom";
-import type { GridRunsResponse, TaskInstanceState } from "openapi/requests";
+import type {
+ GanttTaskInstance,
+ GridRunsResponse,
+ LightGridTaskInstanceSummary,
+ TaskInstanceState,
+} from "openapi/requests";
+import { SearchParamsKeys } from "src/constants/searchParams";
+import type { GridTask } from "src/layouts/Details/Grid/utils";
import { getDuration, isStatePending } from "src/utils";
import { formatDate } from "src/utils/datetimeUtils";
import { buildTaskInstanceUrl } from "src/utils/links";
@@ -31,6 +40,7 @@ export type GanttDataItem = {
isMapped?: boolean | null;
state?: TaskInstanceState | null;
taskId: string;
+ tryNumber?: number;
x: Array<string>;
y: string;
};
@@ -50,6 +60,7 @@ type ChartOptionsParams = {
handleBarHover: (event: ChartEvent, elements: Array<ActiveElement>) => void;
hoveredId?: string | null;
hoveredItemColor?: string;
+ labels: Array<string>;
selectedId?: string;
selectedItemColor?: string;
selectedRun?: GridRunsResponse;
@@ -57,35 +68,121 @@ type ChartOptionsParams = {
translate: TFunction;
};
+type TransformGanttDataParams = {
+ allTries: Array<GanttTaskInstance>;
+ flatNodes: Array<GridTask>;
+ gridSummaries: Array<LightGridTaskInstanceSummary>;
+};
+
+export const transformGanttData = ({
+ allTries,
+ flatNodes,
+ gridSummaries,
+}: TransformGanttDataParams): Array<GanttDataItem> => {
+ // Group tries by task_id
+ const triesByTask = new Map<string, Array<GanttTaskInstance>>();
+
+ for (const ti of allTries) {
+ const existing = triesByTask.get(ti.task_id) ?? [];
+
+ existing.push(ti);
+ triesByTask.set(ti.task_id, existing);
+ }
+
+ return flatNodes
+ .flatMap((node): Array<GanttDataItem> | undefined => {
+ const gridSummary = gridSummaries.find((ti) => ti.task_id === node.id);
+
+ // Handle groups and mapped tasks using grid summary (aggregated min/max
times)
+ // Use ISO so time scale and bar positions render consistently across
browsers
+ if ((node.isGroup ?? node.is_mapped) && gridSummary) {
+ return [
+ {
+ isGroup: node.isGroup,
+ isMapped: node.is_mapped,
+ state: gridSummary.state,
+ taskId: gridSummary.task_id,
+ x: [
+ dayjs(gridSummary.min_start_date).toISOString(),
+ dayjs(gridSummary.max_end_date).toISOString(),
+ ],
+ y: gridSummary.task_id,
+ },
+ ];
+ }
+
+ // Handle individual tasks with all their tries
+ if (!node.isGroup) {
+ const tries = triesByTask.get(node.id);
+
+ if (tries && tries.length > 0) {
+ return tries.map((tryInstance) => {
+ const hasTaskRunning = isStatePending(tryInstance.state);
+ const endTime = hasTaskRunning ? dayjs().toISOString() :
tryInstance.end_date;
+
+ return {
+ isGroup: false,
+ isMapped: tryInstance.is_mapped,
+ state: tryInstance.state,
+ taskId: tryInstance.task_id,
+ tryNumber: tryInstance.try_number,
+ x: [dayjs(tryInstance.start_date).toISOString(),
dayjs(endTime).toISOString()],
+ y: tryInstance.task_id,
+ };
+ });
+ }
+ }
+
+ return undefined;
+ })
+ .filter((item): item is GanttDataItem => item !== undefined);
+};
+
export const createHandleBarClick =
({ dagId, data, location, navigate, runId }: HandleBarClickOptions) =>
(_: ChartEvent, elements: Array<ActiveElement>) => {
- if (elements.length > 0 && elements[0] && Boolean(runId)) {
- const clickedData = data[elements[0].index];
-
- if (clickedData) {
- const { isGroup, isMapped, taskId } = clickedData;
-
- const taskUrl = buildTaskInstanceUrl({
- currentPathname: location.pathname,
- dagId,
- isGroup: Boolean(isGroup),
- isMapped: Boolean(isMapped),
- runId,
- taskId,
- });
+ if (elements.length === 0 || !elements[0] || !runId) {
+ return;
+ }
- void Promise.resolve(
- navigate(
- {
- pathname: taskUrl,
- search: location.search,
- },
- { replace: true },
- ),
- );
- }
+ const clickedData = data[elements[0].index];
+
+ if (!clickedData) {
+ return;
+ }
+
+ const { isGroup, isMapped, taskId, tryNumber } = clickedData;
+
+ const taskUrl = buildTaskInstanceUrl({
+ currentPathname: location.pathname,
+ dagId,
+ isGroup: Boolean(isGroup),
+ isMapped: Boolean(isMapped),
+ runId,
+ taskId,
+ });
+
+ const searchParams = new URLSearchParams(location.search);
+ const isOlderTry =
+ tryNumber !== undefined &&
+ tryNumber <
+ Math.max(...data.filter((item) => item.taskId === taskId).map((item)
=> item.tryNumber ?? 1));
+
+ if (isOlderTry) {
+ searchParams.set(SearchParamsKeys.TRY_NUMBER, tryNumber.toString());
+ } else {
+ searchParams.delete(SearchParamsKeys.TRY_NUMBER);
}
+
+ void Promise.resolve(
+ navigate(
+ {
+ pathname: taskUrl,
+ search: searchParams.toString(),
+ },
+ { replace: true },
+ ),
+ );
};
export const createHandleBarHover = (
@@ -136,6 +233,7 @@ export const createChartOptions = ({
handleBarHover,
hoveredId,
hoveredItemColor,
+ labels,
selectedId,
selectedItemColor,
selectedRun,
@@ -178,8 +276,8 @@ export const createChartOptions = ({
type: "box" as const,
xMax: "max" as const,
xMin: "min" as const,
- yMax: data.findIndex((dataItem) => dataItem.y ===
selectedId) + 0.5,
- yMin: data.findIndex((dataItem) => dataItem.y ===
selectedId) - 0.5,
+ yMax: labels.indexOf(selectedId) + 0.5,
+ yMin: labels.indexOf(selectedId) - 0.5,
},
]),
// Hovered task annotation
@@ -193,8 +291,8 @@ export const createChartOptions = ({
type: "box" as const,
xMax: "max" as const,
xMin: "min" as const,
- yMax: data.findIndex((dataItem) => dataItem.y === hoveredId)
+ 0.5,
- yMin: data.findIndex((dataItem) => dataItem.y === hoveredId)
- 0.5,
+ yMax: labels.indexOf(hoveredId) + 0.5,
+ yMin: labels.indexOf(hoveredId) - 0.5,
},
]),
],
@@ -206,19 +304,23 @@ export const createChartOptions = ({
tooltip: {
callbacks: {
afterBody(tooltipItems: Array<TooltipItem<"bar">>) {
- const taskInstance = data.find((dataItem) => dataItem.y ===
tooltipItems[0]?.label);
+ const taskInstance = data[tooltipItems[0]?.dataIndex ?? 0];
const startDate = formatDate(taskInstance?.x[0], selectedTimezone);
const endDate = formatDate(taskInstance?.x[1], selectedTimezone);
-
- return [
+ const lines = [
`${translate("startDate")}: ${startDate}`,
`${translate("endDate")}: ${endDate}`,
`${translate("duration")}: ${getDuration(taskInstance?.x[0],
taskInstance?.x[1])}`,
];
+
+ if (taskInstance?.tryNumber !== undefined) {
+ lines.unshift(`${translate("tryNumber")}:
${taskInstance.tryNumber}`);
+ }
+
+ return lines;
},
label(tooltipItem: TooltipItem<"bar">) {
- const { label } = tooltipItem;
- const taskInstance = data.find((dataItem) => dataItem.y === label);
+ const taskInstance = data[tooltipItem.dataIndex];
return `${translate("state")}:
${translate(`states.${taskInstance?.state}`)}`;
},
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
index 9f5d4c267d7..b631eb48cf4 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
@@ -24,6 +24,7 @@ import {
UseDagRunServiceGetDagRunKeyFn,
useDagRunServiceGetDagRunsKey,
UseDagServiceGetDagDetailsKeyFn,
+ UseGanttServiceGetGanttDataKeyFn,
useTaskInstanceServiceGetTaskInstancesKey,
UseGridServiceGetGridRunsKeyFn,
UseGridServiceGetGridTiSummariesKeyFn,
@@ -60,6 +61,7 @@ export const useClearDagRun = ({
[useDagRunServiceGetDagRunsKey],
[useClearDagRunDryRunKey, dagId],
UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
+ UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{
dagId, runId: dagRunId }]),
];
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
index ce9822a63ef..3130041a152 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
@@ -22,6 +22,7 @@ import { useTranslation } from "react-i18next";
import {
UseDagRunServiceGetDagRunKeyFn,
useDagRunServiceGetDagRunsKey,
+ UseGanttServiceGetGanttDataKeyFn,
UseTaskInstanceServiceGetMappedTaskInstanceKeyFn,
useTaskInstanceServicePostClearTaskInstances,
UseGridServiceGetGridRunsKeyFn,
@@ -113,6 +114,7 @@ export const useClearTaskInstances = ({
[useClearTaskInstancesDryRunKey, dagId],
[usePatchTaskInstanceDryRunKey, dagId, dagRunId],
UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
+ UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
affectsMultipleRuns
? [useGridServiceGetGridTiSummariesKey, { dagId }]
: UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }),
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_gantt.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_gantt.py
new file mode 100644
index 00000000000..6442e89e48e
--- /dev/null
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_gantt.py
@@ -0,0 +1,314 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from operator import attrgetter
+
+import pendulum
+import pytest
+
+from airflow._shared.timezones import timezone
+from airflow.models.dagbag import DBDagBag
+from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.utils.session import provide_session
+from airflow.utils.state import DagRunState, TaskInstanceState
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
+
+from tests_common.test_utils.asserts import assert_queries_count
+from tests_common.test_utils.db import clear_db_assets, clear_db_dags,
clear_db_runs, clear_db_serialized_dags
+from tests_common.test_utils.mock_operators import MockOperator
+
+pytestmark = pytest.mark.db_test
+
+DAG_ID = "test_gantt_dag"
+DAG_ID_2 = "test_gantt_dag_2"
+DAG_ID_3 = "test_gantt_dag_3"
+TASK_ID = "task"
+TASK_ID_2 = "task2"
+TASK_ID_3 = "task3"
+MAPPED_TASK_ID = "mapped_task"
+
+GANTT_TASK_1 = {
+ "task_id": "task",
+ "try_number": 1,
+ "state": "success",
+ "start_date": "2024-11-30T10:00:00Z",
+ "end_date": "2024-11-30T10:05:00Z",
+ "is_group": False,
+ "is_mapped": False,
+}
+
+GANTT_TASK_2 = {
+ "task_id": "task2",
+ "try_number": 1,
+ "state": "failed",
+ "start_date": "2024-11-30T10:05:00Z",
+ "end_date": "2024-11-30T10:10:00Z",
+ "is_group": False,
+ "is_mapped": False,
+}
+
+GANTT_TASK_3 = {
+ "task_id": "task3",
+ "try_number": 1,
+ "state": "running",
+ "start_date": "2024-11-30T10:10:00Z",
+ "end_date": None,
+ "is_group": False,
+ "is_mapped": False,
+}
+
+
[email protected](autouse=True, scope="module")
+def examples_dag_bag():
+ return DBDagBag()
+
+
[email protected](autouse=True)
+@provide_session
+def setup(dag_maker, session=None):
+ clear_db_runs()
+ clear_db_dags()
+ clear_db_serialized_dags()
+
+ triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST}
+
+ # DAG 1: Multiple tasks with different states (success, failed, running)
+ with dag_maker(dag_id=DAG_ID, serialized=True, session=session) as dag:
+ EmptyOperator(task_id=TASK_ID)
+ EmptyOperator(task_id=TASK_ID_2)
+ EmptyOperator(task_id=TASK_ID_3)
+
+ logical_date = timezone.datetime(2024, 11, 30)
+ data_interval =
dag.timetable.infer_manual_data_interval(run_after=logical_date)
+
+ run_1 = dag_maker.create_dagrun(
+ run_id="run_1",
+ state=DagRunState.RUNNING,
+ run_type=DagRunType.MANUAL,
+ logical_date=logical_date,
+ data_interval=data_interval,
+ **triggered_by_kwargs,
+ )
+
+ for ti in sorted(run_1.task_instances, key=attrgetter("task_id")):
+ if ti.task_id == TASK_ID:
+ ti.state = TaskInstanceState.SUCCESS
+ ti.try_number = 1
+ ti.start_date = pendulum.DateTime(2024, 11, 30, 10, 0, 0,
tzinfo=pendulum.UTC)
+ ti.end_date = pendulum.DateTime(2024, 11, 30, 10, 5, 0,
tzinfo=pendulum.UTC)
+ elif ti.task_id == TASK_ID_2:
+ ti.state = TaskInstanceState.FAILED
+ ti.try_number = 1
+ ti.start_date = pendulum.DateTime(2024, 11, 30, 10, 5, 0,
tzinfo=pendulum.UTC)
+ ti.end_date = pendulum.DateTime(2024, 11, 30, 10, 10, 0,
tzinfo=pendulum.UTC)
+ elif ti.task_id == TASK_ID_3:
+ ti.state = TaskInstanceState.RUNNING
+ ti.try_number = 1
+ ti.start_date = pendulum.DateTime(2024, 11, 30, 10, 10, 0,
tzinfo=pendulum.UTC)
+ ti.end_date = None
+
+ # DAG 2: With mapped tasks (only non-mapped should be returned)
+ with dag_maker(dag_id=DAG_ID_2, serialized=True, session=session) as dag_2:
+ EmptyOperator(task_id=TASK_ID)
+ MockOperator.partial(task_id=MAPPED_TASK_ID).expand(arg1=["a", "b",
"c"])
+
+ logical_date_2 = timezone.datetime(2024, 12, 1)
+ data_interval_2 =
dag_2.timetable.infer_manual_data_interval(run_after=logical_date_2)
+
+ run_2 = dag_maker.create_dagrun(
+ run_id="run_2",
+ state=DagRunState.SUCCESS,
+ run_type=DagRunType.MANUAL,
+ logical_date=logical_date_2,
+ data_interval=data_interval_2,
+ **triggered_by_kwargs,
+ )
+
+ for ti in run_2.task_instances:
+ ti.state = TaskInstanceState.SUCCESS
+ ti.try_number = 1
+ ti.start_date = pendulum.DateTime(2024, 12, 1, 10, 0, 0,
tzinfo=pendulum.UTC)
+ ti.end_date = pendulum.DateTime(2024, 12, 1, 10, 5, 0,
tzinfo=pendulum.UTC)
+
+ # DAG 3: With UP_FOR_RETRY state (should be excluded from results)
+ with dag_maker(dag_id=DAG_ID_3, serialized=True, session=session) as dag_3:
+ EmptyOperator(task_id=TASK_ID)
+ EmptyOperator(task_id=TASK_ID_2)
+
+ logical_date_3 = timezone.datetime(2024, 12, 2)
+ data_interval_3 =
dag_3.timetable.infer_manual_data_interval(run_after=logical_date_3)
+
+ run_3 = dag_maker.create_dagrun(
+ run_id="run_3",
+ state=DagRunState.RUNNING,
+ run_type=DagRunType.MANUAL,
+ logical_date=logical_date_3,
+ data_interval=data_interval_3,
+ **triggered_by_kwargs,
+ )
+
+ for ti in sorted(run_3.task_instances, key=attrgetter("task_id")):
+ if ti.task_id == TASK_ID:
+ ti.state = TaskInstanceState.SUCCESS
+ ti.try_number = 1
+ ti.start_date = pendulum.DateTime(2024, 12, 2, 10, 0, 0,
tzinfo=pendulum.UTC)
+ ti.end_date = pendulum.DateTime(2024, 12, 2, 10, 5, 0,
tzinfo=pendulum.UTC)
+ elif ti.task_id == TASK_ID_2:
+ # UP_FOR_RETRY should be excluded (historical tries are in
TaskInstanceHistory)
+ ti.state = TaskInstanceState.UP_FOR_RETRY
+ ti.try_number = 2
+ ti.start_date = pendulum.DateTime(2024, 12, 2, 10, 5, 0,
tzinfo=pendulum.UTC)
+ ti.end_date = pendulum.DateTime(2024, 12, 2, 10, 10, 0,
tzinfo=pendulum.UTC)
+
+ session.commit()
+
+
[email protected](autouse=True)
+def _clean():
+ clear_db_runs()
+ clear_db_assets()
+ yield
+ clear_db_runs()
+ clear_db_assets()
+
+
[email protected]("setup")
+class TestGetGanttDataEndpoint:
+ def test_should_response_200(self, test_client):
+ with assert_queries_count(3):
+ response = test_client.get(f"/gantt/{DAG_ID}/run_1")
+ assert response.status_code == 200
+ data = response.json()
+ assert data["dag_id"] == DAG_ID
+ assert data["run_id"] == "run_1"
+ actual = sorted(data["task_instances"], key=lambda x: x["task_id"])
+ assert actual == [GANTT_TASK_1, GANTT_TASK_2, GANTT_TASK_3]
+
+ @pytest.mark.parametrize(
+ ("dag_id", "run_id", "expected_task_ids", "expected_states"),
+ [
+ pytest.param(
+ DAG_ID,
+ "run_1",
+ ["task", "task2", "task3"],
+ {"success", "failed", "running"},
+ id="dag1_multiple_states",
+ ),
+ pytest.param(
+ DAG_ID_2,
+ "run_2",
+ ["task"],
+ {"success"},
+ id="dag2_filters_mapped_tasks",
+ ),
+ pytest.param(
+ DAG_ID_3,
+ "run_3",
+ ["task"],
+ {"success"},
+ id="dag3_excludes_up_for_retry",
+ ),
+ ],
+ )
+ def test_task_filtering_and_states(self, test_client, dag_id, run_id,
expected_task_ids, expected_states):
+ with assert_queries_count(3):
+ response = test_client.get(f"/gantt/{dag_id}/{run_id}")
+ assert response.status_code == 200
+ data = response.json()
+
+ actual_task_ids = sorted([ti["task_id"] for ti in
data["task_instances"]])
+ assert actual_task_ids == expected_task_ids
+
+ actual_states = {ti["state"] for ti in data["task_instances"]}
+ assert actual_states == expected_states
+
+ @pytest.mark.parametrize(
+ ("dag_id", "run_id", "task_id", "expected_start", "expected_end",
"expected_state"),
+ [
+ pytest.param(
+ DAG_ID,
+ "run_1",
+ "task",
+ "2024-11-30T10:00:00Z",
+ "2024-11-30T10:05:00Z",
+ "success",
+ id="success_task_has_dates",
+ ),
+ pytest.param(
+ DAG_ID,
+ "run_1",
+ "task2",
+ "2024-11-30T10:05:00Z",
+ "2024-11-30T10:10:00Z",
+ "failed",
+ id="failed_task_has_dates",
+ ),
+ pytest.param(
+ DAG_ID,
+ "run_1",
+ "task3",
+ "2024-11-30T10:10:00Z",
+ None,
+ "running",
+ id="running_task_null_end_date",
+ ),
+ ],
+ )
+ def test_task_dates_and_states(
+ self, test_client, dag_id, run_id, task_id, expected_start,
expected_end, expected_state
+ ):
+ with assert_queries_count(3):
+ response = test_client.get(f"/gantt/{dag_id}/{run_id}")
+ assert response.status_code == 200
+ data = response.json()
+ ti = next((t for t in data["task_instances"] if t["task_id"] ==
task_id), None)
+ assert ti is not None
+ assert ti["start_date"] == expected_start
+ assert ti["end_date"] == expected_end
+ assert ti["state"] == expected_state
+
+ def test_sorted_by_task_id_and_try_number(self, test_client):
+ with assert_queries_count(3):
+ response = test_client.get(f"/gantt/{DAG_ID}/run_1")
+ assert response.status_code == 200
+ data = response.json()
+ task_instances = data["task_instances"]
+ sorted_tis = sorted(task_instances, key=lambda x: (x["task_id"],
x["try_number"]))
+ assert task_instances == sorted_tis
+
+ def test_should_response_401(self, unauthenticated_test_client):
+ response = unauthenticated_test_client.get(f"/gantt/{DAG_ID}/run_1")
+ assert response.status_code == 401
+
+ def test_should_response_403(self, unauthorized_test_client):
+ response = unauthorized_test_client.get(f"/gantt/{DAG_ID}/run_1")
+ assert response.status_code == 403
+
+ @pytest.mark.parametrize(
+ ("dag_id", "run_id"),
+ [
+ pytest.param("invalid_dag", "run_1", id="invalid_dag"),
+ pytest.param(DAG_ID, "invalid_run", id="invalid_run"),
+ pytest.param("invalid_dag", "invalid_run", id="both_invalid"),
+ ],
+ )
+ def test_should_response_404(self, test_client, dag_id, run_id):
+ with assert_queries_count(3):
+ response = test_client.get(f"/gantt/{dag_id}/{run_id}")
+ assert response.status_code == 404