This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7c77a6d2480 Stream grid task-instance summaries as NDJSON to eliminate
N+1 requests (#62369)
7c77a6d2480 is described below
commit 7c77a6d2480cb27eea5d29724bbbfdcf2fbd140b
Author: Xavi Vega <[email protected]>
AuthorDate: Wed Mar 18 12:29:34 2026 +0100
Stream grid task-instance summaries as NDJSON to eliminate N+1 requests
(#62369)
* Stream task instance summaries for multiple DAG runs over a single NDJSON
connection to eliminate N+1 requests
* Stream task instance summaries for multiple DAG runs over a single NDJSON
connection, replacing individual requests to improve performance and eliminate
N+1 query issues.
* Fix capitalization of "Dag" in documentation and code comments for
consistency
* Refactor GridTISummaries schema and update streaming endpoint to improve
clarity and performance
* Fix formatting and linter issues
* Fix static check
* Fix static check
* Fix static check
---
airflow-core/newsfragments/62369.significant.rst | 30 ++++
.../api_fastapi/core_api/openapi/_private_ui.yaml | 50 +++---
.../airflow/api_fastapi/core_api/routes/ui/grid.py | 187 +++++++++++----------
.../src/airflow/ui/openapi-gen/queries/common.ts | 12 +-
.../ui/openapi-gen/queries/ensureQueryData.ts | 28 ++-
.../src/airflow/ui/openapi-gen/queries/prefetch.ts | 28 ++-
.../src/airflow/ui/openapi-gen/queries/queries.ts | 28 ++-
.../src/airflow/ui/openapi-gen/queries/suspense.ts | 28 ++-
.../ui/openapi-gen/requests/services.gen.ts | 34 ++--
.../airflow/ui/openapi-gen/requests/types.gen.ts | 14 +-
.../airflow/ui/src/layouts/Details/Gantt/Gantt.tsx | 11 +-
.../airflow/ui/src/layouts/Details/Graph/Graph.tsx | 5 +-
.../airflow/ui/src/layouts/Details/Grid/Grid.tsx | 8 +
.../layouts/Details/Grid/TaskInstancesColumn.tsx | 15 +-
.../pages/GroupTaskInstance/GroupTaskInstance.tsx | 5 +-
.../MappedTaskInstance/MappedTaskInstance.tsx | 5 +-
.../ui/src/pages/TaskInstance/TaskInstance.tsx | 5 +-
.../src/airflow/ui/src/queries/useClearRun.ts | 2 -
.../ui/src/queries/useClearTaskInstances.ts | 9 -
.../ui/src/queries/useDeleteTaskInstance.ts | 2 -
.../airflow/ui/src/queries/useGridTISummaries.ts | 134 +++++++++++----
.../src/airflow/ui/src/queries/usePatchDagRun.ts | 2 -
.../airflow/ui/src/queries/usePatchTaskInstance.ts | 19 +--
.../api_fastapi/core_api/routes/ui/test_grid.py | 68 +++++++-
24 files changed, 418 insertions(+), 311 deletions(-)
diff --git a/airflow-core/newsfragments/62369.significant.rst
b/airflow-core/newsfragments/62369.significant.rst
new file mode 100644
index 00000000000..b5d4113ae58
--- /dev/null
+++ b/airflow-core/newsfragments/62369.significant.rst
@@ -0,0 +1,30 @@
+Replace per-run TI summary requests with a single NDJSON stream
+
+The grid, graph, gantt, and task-detail views now fetch task-instance
+summaries through a single streaming HTTP request
+(``GET /ui/grid/ti_summaries/{dag_id}?run_ids=...``) instead of one request
+per run. The server emits one JSON line per run as soon as that run's task
+instances are ready, so columns appear progressively rather than all at once.
+
+**What changed:**
+
+- ``GET /ui/grid/ti_summaries/{dag_id}?run_ids=...`` is now the sole endpoint
+ for TI summaries, returning an ``application/x-ndjson`` stream where each
+ line is a serialized ``GridTISummaries`` object for one run.
+- The old single-run endpoint ``GET /ui/grid/ti_summaries/{dag_id}/{run_id}``
+ has been removed.
+- The serialized Dag structure is loaded once and shared across all runs that
+ share the same ``dag_version_id``, avoiding redundant deserialization.
+- All UI views (grid, graph, gantt, task instance, mapped task instance, group
+ task instance) use the stream endpoint, passing one or more ``run_ids``.
+
+* Types of change
+
+ * [ ] Dag changes
+ * [ ] Config changes
+ * [x] API changes
+ * [ ] CLI changes
+ * [x] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [ ] Code interface changes
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index 2b10156b4f5..9e81e8344fb 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -1129,35 +1129,26 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
- /ui/grid/ti_summaries/{dag_id}/{run_id}:
+ /ui/grid/ti_summaries/{dag_id}:
get:
tags:
- Grid
- summary: Get Grid Ti Summaries
- description: 'Get states for TIs / "groups" of TIs.
+ summary: Get Grid Ti Summaries Stream
+ description: 'Stream TI summaries for multiple Dag runs as NDJSON (one
JSON
+ line per run).
- Essentially this is to know what color to put in the squares in the
grid.
+ Each line is a serialized ``GridTISummaries`` object emitted as soon
as that
+ run''s task instances have been processed, so the client can render
columns
- The tricky part here is that we aggregate the state for groups and
mapped
- tasks.
+ progressively without waiting for all runs to complete.
- We don''t add all the TIs for mapped TIs -- we only add one entry for
the
- mapped task and
+ The serialized Dag structure is loaded once and reused for all runs
that
- its state is an aggregate of its TI states.
-
-
- And for task groups, we add a "task" for that which is not really a
task but
- is just
-
- an entry that represents the group (so that we can show a filled in
box when
- the group
-
- is not expanded) and its state is an agg of those within it.'
- operationId: get_grid_ti_summaries
+ share the same ``dag_version_id``, avoiding repeated deserialization.'
+ operationId: get_grid_ti_summaries_stream
security:
- OAuth2PasswordBearer: []
- HTTPBearer: []
@@ -1168,19 +1159,24 @@ paths:
schema:
type: string
title: Dag Id
- - name: run_id
- in: path
- required: true
+ - name: run_ids
+ in: query
+ required: false
schema:
- type: string
- title: Run Id
+ anyOf:
+ - type: array
+ items:
+ type: string
+ - type: 'null'
+ title: Run Ids
responses:
'200':
- description: Successful Response
+ description: "NDJSON stream \u2014 one ``GridTISummaries`` JSON
object per\
+ \ line, one per Dag run"
content:
- application/json:
+ application/x-ndjson:
schema:
- $ref: '#/components/schemas/GridTISummaries'
+ type: string
'400':
content:
application/json:
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index ca363c3adab..0143ae81e14 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -18,10 +18,12 @@
from __future__ import annotations
import collections
+from collections.abc import Generator, Sequence
from typing import TYPE_CHECKING, Annotated, Any
import structlog
-from fastapi import Depends, HTTPException, status
+from fastapi import Depends, HTTPException, Query, status
+from fastapi.responses import StreamingResponse
from sqlalchemy import exists, select
from sqlalchemy.orm import joinedload, load_only, selectinload
@@ -337,96 +339,30 @@ def get_grid_runs(
return grid_runs
-@grid_router.get(
- "/ti_summaries/{dag_id}/{run_id}",
- responses=create_openapi_http_exception_doc(
- [
- status.HTTP_400_BAD_REQUEST,
- status.HTTP_404_NOT_FOUND,
- ]
- ),
- dependencies=[
- Depends(
- requires_access_dag(
- method="GET",
- access_entity=DagAccessEntity.TASK_INSTANCE,
- )
- ),
- Depends(
- requires_access_dag(
- method="GET",
- access_entity=DagAccessEntity.RUN,
- )
- ),
- ],
-)
-def get_grid_ti_summaries(
- dag_id: str,
- run_id: str,
- session: SessionDep,
-) -> GridTISummaries:
- """
- Get states for TIs / "groups" of TIs.
-
- Essentially this is to know what color to put in the squares in the grid.
-
- The tricky part here is that we aggregate the state for groups and mapped
tasks.
-
- We don't add all the TIs for mapped TIs -- we only add one entry for the
mapped task and
- its state is an aggregate of its TI states.
-
- And for task groups, we add a "task" for that which is not really a task
but is just
- an entry that represents the group (so that we can show a filled in box
when the group
- is not expanded) and its state is an agg of those within it.
- """
- tis_of_dag_runs, _ = paginated_select(
- statement=(
- select(
- TaskInstance.task_id,
- TaskInstance.state,
- TaskInstance.dag_version_id,
- TaskInstance.start_date,
- TaskInstance.end_date,
- DagVersion.version_number,
- )
- .outerjoin(DagVersion, TaskInstance.dag_version_id ==
DagVersion.id)
- .where(TaskInstance.dag_id == dag_id)
- .where(
- TaskInstance.run_id == run_id,
- )
- ),
- filters=[],
- order_by=SortParam(allowed_attrs=["task_id", "run_id"],
model=TaskInstance).set_value(["task_id"]),
- limit=None,
- return_total_entries=False,
- )
- task_instances = list(session.execute(tis_of_dag_runs))
- if not task_instances:
- raise HTTPException(
- status.HTTP_404_NOT_FOUND, f"No task instances for dag_id={dag_id}
run_id={run_id}"
- )
- ti_details = collections.defaultdict(list)
+def _build_ti_summaries(
+ dag_id: str, run_id: str, task_instances: Sequence, session, serdag:
SerializedDagModel | None = None
+) -> dict:
+ ti_details: dict = collections.defaultdict(list)
for ti in task_instances:
ti_details[ti.task_id].append(
{
"state": ti.state,
"start_date": ti.start_date,
"end_date": ti.end_date,
- "dag_version_number": ti.version_number,
+ "dag_version_number": getattr(ti, "version_number", None),
}
)
- serdag = _get_serdag(
- dag_id=dag_id,
- dag_version_id=task_instances[0].dag_version_id,
- session=session,
- )
+ if serdag is None:
+ serdag = _get_serdag(
+ dag_id=dag_id,
+ dag_version_id=task_instances[0].dag_version_id,
+ session=session,
+ )
if TYPE_CHECKING:
assert serdag
- def get_node_sumaries():
+ def get_node_summaries():
yielded_task_ids: set[str] = set()
-
- # Yield all nodes discoverable from the serialized DAG structure
for node in _find_aggregates(
node=serdag.dag.task_group,
parent_node=None,
@@ -437,13 +373,9 @@ def get_grid_ti_summaries(
if node["type"] == "task":
node["child_states"] = None
yield node
-
- # For good history: add synthetic leaf nodes for task_ids that have
TIs in this run
- # but are not present in the current DAG structure (e.g. removed tasks)
missing_task_ids = set(ti_details.keys()) - yielded_task_ids
for task_id in sorted(missing_task_ids):
detail = ti_details[task_id]
- # Create a leaf task node with aggregated state from its TIs
agg = _get_aggs_for_node(detail)
yield {
"task_id": task_id,
@@ -451,17 +383,86 @@ def get_grid_ti_summaries(
"type": "task",
"parent_id": None,
**agg,
- # Leaf tasks have no children
"child_states": None,
}
- task_instances = list(get_node_sumaries())
+ nodes = list(get_node_summaries())
# If a group id and a task id collide, prefer the group record
- group_ids = {n.get("task_id") for n in task_instances if n.get("type") ==
"group"}
- filtered = [n for n in task_instances if not (n.get("type") == "task" and
n.get("task_id") in group_ids)]
-
- return { # type: ignore[return-value]
- "run_id": run_id,
- "dag_id": dag_id,
- "task_instances": filtered,
- }
+ group_ids = {n.get("task_id") for n in nodes if n.get("type") == "group"}
+ filtered = [n for n in nodes if not (n.get("type") == "task" and
n.get("task_id") in group_ids)]
+ return {"run_id": run_id, "dag_id": dag_id, "task_instances": filtered}
+
+
+@grid_router.get(
+ "/ti_summaries/{dag_id}",
+ response_class=StreamingResponse,
+ response_model=GridTISummaries,
+ responses={
+ **create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+ 200: {
+ "content": {"application/x-ndjson": {"schema": {"type":
"string"}}},
+ "description": "NDJSON stream — one ``GridTISummaries`` JSON
object per line, one per Dag run",
+ },
+ },
+ dependencies=[
+ Depends(
+ requires_access_dag(
+ method="GET",
+ access_entity=DagAccessEntity.TASK_INSTANCE,
+ )
+ ),
+ Depends(
+ requires_access_dag(
+ method="GET",
+ access_entity=DagAccessEntity.RUN,
+ )
+ ),
+ ],
+)
+def get_grid_ti_summaries_stream(
+ dag_id: str,
+ session: SessionDep,
+ run_ids: Annotated[list[str] | None, Query()] = None,
+) -> StreamingResponse:
+ """
+ Stream TI summaries for multiple Dag runs as NDJSON (one JSON line per
run).
+
+ Each line is a serialized ``GridTISummaries`` object emitted as soon as
that
+ run's task instances have been processed, so the client can render columns
+ progressively without waiting for all runs to complete.
+
+ The serialized Dag structure is loaded once and reused for all runs that
+ share the same ``dag_version_id``, avoiding repeated deserialization.
+ """
+
+ def _generate() -> Generator[str, None, None]:
+ serdag_cache: dict = {}
+ for run_id in run_ids or []:
+ tis = session.execute(
+ select(
+ TaskInstance.task_id,
+ TaskInstance.state,
+ TaskInstance.dag_version_id,
+ TaskInstance.start_date,
+ TaskInstance.end_date,
+ DagVersion.version_number,
+ )
+ .outerjoin(DagVersion, TaskInstance.dag_version_id ==
DagVersion.id)
+ .where(TaskInstance.dag_id == dag_id)
+ .where(TaskInstance.run_id == run_id)
+ .order_by(TaskInstance.task_id)
+ ).all()
+ if not tis:
+ continue
+ version_id = tis[0].dag_version_id
+ if version_id not in serdag_cache:
+ serdag_cache[version_id] = _get_serdag(dag_id, version_id,
session)
+ summary = _build_ti_summaries(dag_id, run_id, tis, session,
serdag=serdag_cache[version_id])
+ yield GridTISummaries.model_validate(summary).model_dump_json() +
"\n"
+
+ return StreamingResponse(content=_generate(),
media_type="application/x-ndjson")
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index 5640e4cce55..612a3d56747 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -880,13 +880,13 @@ export const UseGridServiceGetGridRunsKeyFn = ({ dagId,
limit, offset, orderBy,
state?: string[];
triggeringUser?: string;
}, queryKey?: Array<unknown>) => [useGridServiceGetGridRunsKey, ...(queryKey
?? [{ dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt,
runAfterLte, runType, state, triggeringUser }])];
-export type GridServiceGetGridTiSummariesDefaultResponse =
Awaited<ReturnType<typeof GridService.getGridTiSummaries>>;
-export type GridServiceGetGridTiSummariesQueryResult<TData =
GridServiceGetGridTiSummariesDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
-export const useGridServiceGetGridTiSummariesKey =
"GridServiceGetGridTiSummaries";
-export const UseGridServiceGetGridTiSummariesKeyFn = ({ dagId, runId }: {
+export type GridServiceGetGridTiSummariesStreamDefaultResponse =
Awaited<ReturnType<typeof GridService.getGridTiSummariesStream>>;
+export type GridServiceGetGridTiSummariesStreamQueryResult<TData =
GridServiceGetGridTiSummariesStreamDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
+export const useGridServiceGetGridTiSummariesStreamKey =
"GridServiceGetGridTiSummariesStream";
+export const UseGridServiceGetGridTiSummariesStreamKeyFn = ({ dagId, runIds }:
{
dagId: string;
- runId: string;
-}, queryKey?: Array<unknown>) => [useGridServiceGetGridTiSummariesKey,
...(queryKey ?? [{ dagId, runId }])];
+ runIds?: string[];
+}, queryKey?: Array<unknown>) => [useGridServiceGetGridTiSummariesStreamKey,
...(queryKey ?? [{ dagId, runIds }])];
export type GanttServiceGetGanttDataDefaultResponse =
Awaited<ReturnType<typeof GanttService.getGanttData>>;
export type GanttServiceGetGanttDataQueryResult<TData =
GanttServiceGetGanttDataDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
export const useGanttServiceGetGanttDataKey = "GanttServiceGetGanttData";
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
index a6bf227ca40..8bc9e9df8e6 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
@@ -1672,29 +1672,25 @@ export const ensureUseGridServiceGetGridRunsData =
(queryClient: QueryClient, {
triggeringUser?: string;
}) => queryClient.ensureQueryData({ queryKey:
Common.UseGridServiceGetGridRunsKeyFn({ dagId, limit, offset, orderBy,
runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state,
triggeringUser }), queryFn: () => GridService.getGridRuns({ dagId, limit,
offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType,
state, triggeringUser }) });
/**
-* Get Grid Ti Summaries
-* Get states for TIs / "groups" of TIs.
+* Get Grid Ti Summaries Stream
+* Stream TI summaries for multiple Dag runs as NDJSON (one JSON line per run).
*
-* Essentially this is to know what color to put in the squares in the grid.
+* Each line is a serialized ``GridTISummaries`` object emitted as soon as that
+* run's task instances have been processed, so the client can render columns
+* progressively without waiting for all runs to complete.
*
-* The tricky part here is that we aggregate the state for groups and mapped
tasks.
-*
-* We don't add all the TIs for mapped TIs -- we only add one entry for the
mapped task and
-* its state is an aggregate of its TI states.
-*
-* And for task groups, we add a "task" for that which is not really a task but
is just
-* an entry that represents the group (so that we can show a filled in box when
the group
-* is not expanded) and its state is an agg of those within it.
+* The serialized Dag structure is loaded once and reused for all runs that
+* share the same ``dag_version_id``, avoiding repeated deserialization.
* @param data The data for the request.
* @param data.dagId
-* @param data.runId
-* @returns GridTISummaries Successful Response
+* @param data.runIds
+* @returns string NDJSON stream — one ``GridTISummaries`` JSON object per
line, one per Dag run
* @throws ApiError
*/
-export const ensureUseGridServiceGetGridTiSummariesData = (queryClient:
QueryClient, { dagId, runId }: {
+export const ensureUseGridServiceGetGridTiSummariesStreamData = (queryClient:
QueryClient, { dagId, runIds }: {
dagId: string;
- runId: string;
-}) => queryClient.ensureQueryData({ queryKey:
Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }), queryFn: () =>
GridService.getGridTiSummaries({ dagId, runId }) });
+ runIds?: string[];
+}) => queryClient.ensureQueryData({ queryKey:
Common.UseGridServiceGetGridTiSummariesStreamKeyFn({ dagId, runIds }), queryFn:
() => GridService.getGridTiSummariesStream({ dagId, runIds }) });
/**
* Get Gantt Data
* Get all task instance tries for Gantt chart.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
index c6cc3d62ccf..f4cb6f482bd 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1672,29 +1672,25 @@ export const prefetchUseGridServiceGetGridRuns =
(queryClient: QueryClient, { da
triggeringUser?: string;
}) => queryClient.prefetchQuery({ queryKey:
Common.UseGridServiceGetGridRunsKeyFn({ dagId, limit, offset, orderBy,
runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state,
triggeringUser }), queryFn: () => GridService.getGridRuns({ dagId, limit,
offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType,
state, triggeringUser }) });
/**
-* Get Grid Ti Summaries
-* Get states for TIs / "groups" of TIs.
+* Get Grid Ti Summaries Stream
+* Stream TI summaries for multiple Dag runs as NDJSON (one JSON line per run).
*
-* Essentially this is to know what color to put in the squares in the grid.
+* Each line is a serialized ``GridTISummaries`` object emitted as soon as that
+* run's task instances have been processed, so the client can render columns
+* progressively without waiting for all runs to complete.
*
-* The tricky part here is that we aggregate the state for groups and mapped
tasks.
-*
-* We don't add all the TIs for mapped TIs -- we only add one entry for the
mapped task and
-* its state is an aggregate of its TI states.
-*
-* And for task groups, we add a "task" for that which is not really a task but
is just
-* an entry that represents the group (so that we can show a filled in box when
the group
-* is not expanded) and its state is an agg of those within it.
+* The serialized Dag structure is loaded once and reused for all runs that
+* share the same ``dag_version_id``, avoiding repeated deserialization.
* @param data The data for the request.
* @param data.dagId
-* @param data.runId
-* @returns GridTISummaries Successful Response
+* @param data.runIds
+* @returns string NDJSON stream — one ``GridTISummaries`` JSON object per
line, one per Dag run
* @throws ApiError
*/
-export const prefetchUseGridServiceGetGridTiSummaries = (queryClient:
QueryClient, { dagId, runId }: {
+export const prefetchUseGridServiceGetGridTiSummariesStream = (queryClient:
QueryClient, { dagId, runIds }: {
dagId: string;
- runId: string;
-}) => queryClient.prefetchQuery({ queryKey:
Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }), queryFn: () =>
GridService.getGridTiSummaries({ dagId, runId }) });
+ runIds?: string[];
+}) => queryClient.prefetchQuery({ queryKey:
Common.UseGridServiceGetGridTiSummariesStreamKeyFn({ dagId, runIds }), queryFn:
() => GridService.getGridTiSummariesStream({ dagId, runIds }) });
/**
* Get Gantt Data
* Get all task instance tries for Gantt chart.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 7a0cc07eb20..8e9ef5aa29d 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -1672,29 +1672,25 @@ export const useGridServiceGetGridRuns = <TData =
Common.GridServiceGetGridRunsD
triggeringUser?: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseGridServiceGetGridRunsKeyFn({ dagId, limit, offset, orderBy,
runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state,
triggeringUser }, queryKey), queryFn: () => GridService.getGridRuns({ dagId,
limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte,
runType, state, triggeringUser }) as TData, ...options });
/**
-* Get Grid Ti Summaries
-* Get states for TIs / "groups" of TIs.
+* Get Grid Ti Summaries Stream
+* Stream TI summaries for multiple Dag runs as NDJSON (one JSON line per run).
*
-* Essentially this is to know what color to put in the squares in the grid.
+* Each line is a serialized ``GridTISummaries`` object emitted as soon as that
+* run's task instances have been processed, so the client can render columns
+* progressively without waiting for all runs to complete.
*
-* The tricky part here is that we aggregate the state for groups and mapped
tasks.
-*
-* We don't add all the TIs for mapped TIs -- we only add one entry for the
mapped task and
-* its state is an aggregate of its TI states.
-*
-* And for task groups, we add a "task" for that which is not really a task but
is just
-* an entry that represents the group (so that we can show a filled in box when
the group
-* is not expanded) and its state is an agg of those within it.
+* The serialized Dag structure is loaded once and reused for all runs that
+* share the same ``dag_version_id``, avoiding repeated deserialization.
* @param data The data for the request.
* @param data.dagId
-* @param data.runId
-* @returns GridTISummaries Successful Response
+* @param data.runIds
+* @returns string NDJSON stream — one ``GridTISummaries`` JSON object per
line, one per Dag run
* @throws ApiError
*/
-export const useGridServiceGetGridTiSummaries = <TData =
Common.GridServiceGetGridTiSummariesDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ dagId, runId }: {
+export const useGridServiceGetGridTiSummariesStream = <TData =
Common.GridServiceGetGridTiSummariesStreamDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ dagId, runIds }: {
dagId: string;
- runId: string;
-}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }, queryKey),
queryFn: () => GridService.getGridTiSummaries({ dagId, runId }) as TData,
...options });
+ runIds?: string[];
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseGridServiceGetGridTiSummariesStreamKeyFn({ dagId, runIds },
queryKey), queryFn: () => GridService.getGridTiSummariesStream({ dagId, runIds
}) as TData, ...options });
/**
* Get Gantt Data
* Get all task instance tries for Gantt chart.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
index 518f42ef01e..c4a41691b1a 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
@@ -1672,29 +1672,25 @@ export const useGridServiceGetGridRunsSuspense = <TData
= Common.GridServiceGetG
triggeringUser?: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseGridServiceGetGridRunsKeyFn({ dagId, limit, offset, orderBy,
runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state,
triggeringUser }, queryKey), queryFn: () => GridService.getGridRuns({ dagId,
limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte,
runType, state, triggeringUser }) as TData, ...options });
/**
-* Get Grid Ti Summaries
-* Get states for TIs / "groups" of TIs.
+* Get Grid Ti Summaries Stream
+* Stream TI summaries for multiple Dag runs as NDJSON (one JSON line per run).
*
-* Essentially this is to know what color to put in the squares in the grid.
+* Each line is a serialized ``GridTISummaries`` object emitted as soon as that
+* run's task instances have been processed, so the client can render columns
+* progressively without waiting for all runs to complete.
*
-* The tricky part here is that we aggregate the state for groups and mapped
tasks.
-*
-* We don't add all the TIs for mapped TIs -- we only add one entry for the
mapped task and
-* its state is an aggregate of its TI states.
-*
-* And for task groups, we add a "task" for that which is not really a task but
is just
-* an entry that represents the group (so that we can show a filled in box when
the group
-* is not expanded) and its state is an agg of those within it.
+* The serialized Dag structure is loaded once and reused for all runs that
+* share the same ``dag_version_id``, avoiding repeated deserialization.
* @param data The data for the request.
* @param data.dagId
-* @param data.runId
-* @returns GridTISummaries Successful Response
+* @param data.runIds
+* @returns string NDJSON stream — one ``GridTISummaries`` JSON object per
line, one per Dag run
* @throws ApiError
*/
-export const useGridServiceGetGridTiSummariesSuspense = <TData =
Common.GridServiceGetGridTiSummariesDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ dagId, runId }: {
+export const useGridServiceGetGridTiSummariesStreamSuspense = <TData =
Common.GridServiceGetGridTiSummariesStreamDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ dagId, runIds }: {
dagId: string;
- runId: string;
-}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }, queryKey),
queryFn: () => GridService.getGridTiSummaries({ dagId, runId }) as TData,
...options });
+ runIds?: string[];
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseGridServiceGetGridTiSummariesStreamKeyFn({ dagId, runIds },
queryKey), queryFn: () => GridService.getGridTiSummariesStream({ dagId, runIds
}) as TData, ...options });
/**
* Get Gantt Data
* Get all task instance tries for Gantt chart.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index 81facc3769f..6e701bf68dc 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -3,7 +3,7 @@
import type { CancelablePromise } from './core/CancelablePromise';
import { OpenAPI } from './core/OpenAPI';
import { request as __request } from './core/request';
-import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData,
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse,
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData,
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse,
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData,
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse,
Dele [...]
+import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData,
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse,
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData,
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse,
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData,
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse,
Dele [...]
export class AssetService {
/**
@@ -4159,32 +4159,30 @@ export class GridService {
}
/**
- * Get Grid Ti Summaries
- * Get states for TIs / "groups" of TIs.
+ * Get Grid Ti Summaries Stream
+ * Stream TI summaries for multiple Dag runs as NDJSON (one JSON line per
run).
*
- * Essentially this is to know what color to put in the squares in the
grid.
+ * Each line is a serialized ``GridTISummaries`` object emitted as soon as
that
+ * run's task instances have been processed, so the client can render
columns
+ * progressively without waiting for all runs to complete.
*
- * The tricky part here is that we aggregate the state for groups and
mapped tasks.
- *
- * We don't add all the TIs for mapped TIs -- we only add one entry for
the mapped task and
- * its state is an aggregate of its TI states.
- *
- * And for task groups, we add a "task" for that which is not really a
task but is just
- * an entry that represents the group (so that we can show a filled in box
when the group
- * is not expanded) and its state is an agg of those within it.
+ * The serialized Dag structure is loaded once and reused for all runs that
+ * share the same ``dag_version_id``, avoiding repeated deserialization.
* @param data The data for the request.
* @param data.dagId
- * @param data.runId
- * @returns GridTISummaries Successful Response
+ * @param data.runIds
+ * @returns string NDJSON stream — one ``GridTISummaries`` JSON object per
line, one per Dag run
* @throws ApiError
*/
- public static getGridTiSummaries(data: GetGridTiSummariesData):
CancelablePromise<GetGridTiSummariesResponse> {
+ public static getGridTiSummariesStream(data:
GetGridTiSummariesStreamData):
CancelablePromise<GetGridTiSummariesStreamResponse> {
return __request(OpenAPI, {
method: 'GET',
- url: '/ui/grid/ti_summaries/{dag_id}/{run_id}',
+ url: '/ui/grid/ti_summaries/{dag_id}',
path: {
- dag_id: data.dagId,
- run_id: data.runId
+ dag_id: data.dagId
+ },
+ query: {
+ run_ids: data.runIds
},
errors: {
400: 'Bad Request',
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index a59532b6c16..7423c08d42c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -3653,12 +3653,12 @@ export type GetGridRunsData = {
export type GetGridRunsResponse = Array<GridRunsResponse>;
-export type GetGridTiSummariesData = {
+export type GetGridTiSummariesStreamData = {
dagId: string;
- runId: string;
+ runIds?: Array<(string)> | null;
};
-export type GetGridTiSummariesResponse = GridTISummaries;
+export type GetGridTiSummariesStreamResponse = string;
export type GetGanttDataData = {
dagId: string;
@@ -6936,14 +6936,14 @@ export type $OpenApiTs = {
};
};
};
- '/ui/grid/ti_summaries/{dag_id}/{run_id}': {
+ '/ui/grid/ti_summaries/{dag_id}': {
get: {
- req: GetGridTiSummariesData;
+ req: GetGridTiSummariesStreamData;
res: {
/**
- * Successful Response
+ * NDJSON stream — one ``GridTISummaries`` JSON object per
line, one per Dag run
*/
- 200: GridTISummaries;
+ 200: string;
/**
* Bad Request
*/
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx
index 6c546c58699..71d14ccfbe7 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx
@@ -48,7 +48,7 @@ import { GRID_BODY_OFFSET_PX } from
"src/layouts/Details/Grid/constants";
import { flattenNodes } from "src/layouts/Details/Grid/utils";
import { useGridRuns } from "src/queries/useGridRuns";
import { useGridStructure } from "src/queries/useGridStructure";
-import { useGridTiSummaries } from "src/queries/useGridTISummaries";
+import { useGridTiSummariesStream } from "src/queries/useGridTISummaries";
import { getComputedCSSVariableValue } from "src/theme";
import { isStatePending, useAutoRefresh } from "src/utils";
@@ -131,12 +131,13 @@ export const Gantt = ({ dagRunState, limit, runType,
triggeringUser }: Props) =>
const refetchInterval = useAutoRefresh({ dagId });
// Get grid summaries for groups and mapped tasks (which have min/max times)
- const { data: gridTiSummaries, isLoading: summariesLoading } =
useGridTiSummaries({
+ const { summariesByRunId } = useGridTiSummariesStream({
dagId,
- enabled: Boolean(selectedRun),
- runId,
- state: selectedRun?.state,
+ runIds: runId && selectedRun ? [runId] : [],
+ states: selectedRun ? [selectedRun.state] : [],
});
+ const gridTiSummaries = summariesByRunId.get(runId);
+ const summariesLoading = Boolean(runId && selectedRun &&
!summariesByRunId.has(runId));
// Single fetch for all Gantt data (individual task tries)
const { data: ganttData, isLoading: ganttLoading } =
useGanttServiceGetGanttData(
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
index 0b69efb7d43..6e3fe794021 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
@@ -34,7 +34,7 @@ import { useOpenGroups } from "src/context/openGroups";
import useSelectedVersion from "src/hooks/useSelectedVersion";
import { flattenGraphNodes } from "src/layouts/Details/Grid/utils.ts";
import { useDependencyGraph } from "src/queries/useDependencyGraph";
-import { useGridTiSummaries } from "src/queries/useGridTISummaries.ts";
+import { useGridTiSummariesStream } from "src/queries/useGridTISummaries.ts";
import { getReactFlowThemeStyle } from "src/theme";
const nodeColor = (
@@ -134,7 +134,8 @@ export const Graph = () => {
versionNumber: selectedVersion,
});
- const { data: gridTISummaries } = useGridTiSummaries({ dagId, runId });
+ const { summariesByRunId } = useGridTiSummariesStream({ dagId, runIds: runId
? [runId] : [] });
+ const gridTISummaries = runId ? summariesByRunId.get(runId) : undefined;
// Add task instances to the node data but without having to recalculate how
the graph is laid out
const nodes = data?.nodes.map((node) => {
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
index 517f5d9f7d4..07537c2cb2b 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
@@ -31,6 +31,7 @@ import { useOpenGroups } from "src/context/openGroups";
import { NavigationModes, useNavigation } from "src/hooks/navigation";
import { useGridRuns } from "src/queries/useGridRuns.ts";
import { useGridStructure } from "src/queries/useGridStructure.ts";
+import { useGridTiSummariesStream } from "src/queries/useGridTISummaries.ts";
import { isStatePending } from "src/utils";
import { Bar } from "./Bar";
@@ -95,6 +96,12 @@ export const Grid = ({
}
}, [runId, gridRuns, selectedIsVisible, setSelectedIsVisible]);
+ const { summariesByRunId } = useGridTiSummariesStream({
+ dagId,
+ runIds: gridRuns?.map((dr: GridRunsResponse) => dr.run_id) ?? [],
+ states: gridRuns?.map((dr: GridRunsResponse) => dr.state),
+ });
+
const { data: dagStructure } = useGridStructure({
dagRunState,
depth,
@@ -225,6 +232,7 @@ export const Grid = ({
onCellClick={handleCellClick}
run={dr}
showVersionIndicatorMode={showVersionIndicatorMode}
+ tiSummaries={summariesByRunId.get(dr.run_id)}
virtualItems={virtualItems}
/>
))}
diff --git
a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx
index 7727748fba7..05b32145514 100644
---
a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx
+++
b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx
@@ -20,11 +20,10 @@ import { Box } from "@chakra-ui/react";
import type { VirtualItem } from "@tanstack/react-virtual";
import { useParams } from "react-router-dom";
-import type { GridRunsResponse } from "openapi/requests";
+import type { GridRunsResponse, GridTISummaries } from "openapi/requests";
import type { LightGridTaskInstanceSummary } from "openapi/requests/types.gen";
import { VersionIndicatorOptions } from
"src/constants/showVersionIndicatorOptions";
import { useHover } from "src/context/hover";
-import { useGridTiSummaries } from "src/queries/useGridTISummaries.ts";
import { GridTI } from "./GridTI";
import { DagVersionIndicator } from "./VersionIndicator";
@@ -35,6 +34,7 @@ type Props = {
readonly onCellClick?: () => void;
readonly run: GridRunsResponse;
readonly showVersionIndicatorMode?: VersionIndicatorOptions;
+ readonly tiSummaries?: GridTISummaries;
readonly virtualItems?: Array<VirtualItem>;
};
@@ -45,23 +45,18 @@ export const TaskInstancesColumn = ({
onCellClick,
run,
showVersionIndicatorMode,
+ tiSummaries,
virtualItems,
}: Props) => {
const { dagId = "", runId } = useParams();
const isSelected = runId === run.run_id;
- const { data: gridTISummaries } = useGridTiSummaries({
- dagId,
- isSelected,
- runId: run.run_id,
- state: run.state,
- });
+
const { hoveredRunId, setHoveredRunId } = useHover();
const itemsToRender =
virtualItems ?? nodes.map((_, index) => ({ index, size: ROW_HEIGHT, start:
index * ROW_HEIGHT }));
- const taskInstances = gridTISummaries?.task_instances ?? [];
-
+ const taskInstances = tiSummaries?.task_instances ?? [];
const taskInstanceMap = new Map<string, LightGridTaskInstanceSummary>();
for (const ti of taskInstances) {
diff --git
a/airflow-core/src/airflow/ui/src/pages/GroupTaskInstance/GroupTaskInstance.tsx
b/airflow-core/src/airflow/ui/src/pages/GroupTaskInstance/GroupTaskInstance.tsx
index a4efef77265..abb741e9498 100644
---
a/airflow-core/src/airflow/ui/src/pages/GroupTaskInstance/GroupTaskInstance.tsx
+++
b/airflow-core/src/airflow/ui/src/pages/GroupTaskInstance/GroupTaskInstance.tsx
@@ -22,14 +22,15 @@ import { MdOutlineTask } from "react-icons/md";
import { useParams } from "react-router-dom";
import { DetailsLayout } from "src/layouts/Details/DetailsLayout";
-import { useGridTiSummaries } from "src/queries/useGridTISummaries.ts";
+import { useGridTiSummariesStream } from "src/queries/useGridTISummaries.ts";
import { Header } from "./Header";
export const GroupTaskInstance = () => {
const { dagId = "", groupId = "", runId = "" } = useParams();
const { t: translate } = useTranslation("dag");
- const { data: gridTISummaries } = useGridTiSummaries({ dagId, runId });
+ const { summariesByRunId } = useGridTiSummariesStream({ dagId, runIds: runId
? [runId] : [] });
+ const gridTISummaries = summariesByRunId.get(runId);
const taskInstance = gridTISummaries?.task_instances.find((ti) => ti.task_id
=== groupId);
const tabs = [{ icon: <MdOutlineTask />, label:
translate("tabs.taskInstances"), value: "" }];
diff --git
a/airflow-core/src/airflow/ui/src/pages/MappedTaskInstance/MappedTaskInstance.tsx
b/airflow-core/src/airflow/ui/src/pages/MappedTaskInstance/MappedTaskInstance.tsx
index 6c231c35bcd..6302254eafa 100644
---
a/airflow-core/src/airflow/ui/src/pages/MappedTaskInstance/MappedTaskInstance.tsx
+++
b/airflow-core/src/airflow/ui/src/pages/MappedTaskInstance/MappedTaskInstance.tsx
@@ -22,14 +22,15 @@ import { MdOutlineTask } from "react-icons/md";
import { useParams } from "react-router-dom";
import { DetailsLayout } from "src/layouts/Details/DetailsLayout";
-import { useGridTiSummaries } from "src/queries/useGridTISummaries.ts";
+import { useGridTiSummariesStream } from "src/queries/useGridTISummaries.ts";
import { Header } from "./Header";
export const MappedTaskInstance = () => {
const { dagId = "", runId = "", taskId = "" } = useParams();
const { t: translate } = useTranslation("dag");
- const { data: gridTISummaries } = useGridTiSummaries({ dagId, runId });
+ const { summariesByRunId } = useGridTiSummariesStream({ dagId, runIds: runId
? [runId] : [] });
+ const gridTISummaries = summariesByRunId.get(runId);
const taskInstance = gridTISummaries?.task_instances.find((ti) => ti.task_id
=== taskId);
let taskCount: number = 0;
diff --git
a/airflow-core/src/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx
b/airflow-core/src/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx
index dd956a8d4dd..9f22e299c83 100644
--- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx
@@ -28,7 +28,7 @@ import { useTaskInstanceServiceGetMappedTaskInstance } from
"openapi/queries";
import { usePluginTabs } from "src/hooks/usePluginTabs";
import { useRequiredActionTabs } from "src/hooks/useRequiredActionTabs";
import { DetailsLayout } from "src/layouts/Details/DetailsLayout";
-import { useGridTiSummaries } from "src/queries/useGridTISummaries.ts";
+import { useGridTiSummariesStream } from "src/queries/useGridTISummaries.ts";
import { isStatePending, useAutoRefresh } from "src/utils";
import { Header } from "./Header";
@@ -76,7 +76,8 @@ export const TaskInstance = () => {
},
);
- const { data: gridTISummaries } = useGridTiSummaries({ dagId, runId });
+ const { summariesByRunId } = useGridTiSummariesStream({ dagId, runIds: runId
? [runId] : [] });
+ const gridTISummaries = summariesByRunId.get(runId);
const taskInstanceSummary = gridTISummaries?.task_instances.find((ti) =>
ti.task_id === taskId);
const taskCount = Object.entries(taskInstanceSummary?.child_states ?? {})
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
index b631eb48cf4..af234cc39fc 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
@@ -27,7 +27,6 @@ import {
UseGanttServiceGetGanttDataKeyFn,
useTaskInstanceServiceGetTaskInstancesKey,
UseGridServiceGetGridRunsKeyFn,
- UseGridServiceGetGridTiSummariesKeyFn,
} from "openapi/queries";
import { toaster } from "src/components/ui";
@@ -62,7 +61,6 @@ export const useClearDagRun = ({
[useClearDagRunDryRunKey, dagId],
UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
- UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{
dagId, runId: dagRunId }]),
];
await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({
queryKey: key })));
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
index 3130041a152..e8d13ad000e 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
@@ -26,8 +26,6 @@ import {
UseTaskInstanceServiceGetMappedTaskInstanceKeyFn,
useTaskInstanceServicePostClearTaskInstances,
UseGridServiceGetGridRunsKeyFn,
- UseGridServiceGetGridTiSummariesKeyFn,
- useGridServiceGetGridTiSummariesKey,
} from "openapi/queries";
import type { ApiError } from "openapi/requests";
import type { ClearTaskInstancesBody, TaskInstanceCollectionResponse } from
"openapi/requests/types.gen";
@@ -103,10 +101,6 @@ export const useClearTaskInstances = ({
),
];
- // Check if this clear operation affects multiple DAG runs
- const { include_future: includeFuture, include_past: includePast } =
variables.requestBody;
- const affectsMultipleRuns = includeFuture === true || includePast === true;
-
const queryKeys = [
...taskInstanceKeys,
UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
@@ -115,9 +109,6 @@ export const useClearTaskInstances = ({
[usePatchTaskInstanceDryRunKey, dagId, dagRunId],
UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
- affectsMultipleRuns
- ? [useGridServiceGetGridTiSummariesKey, { dagId }]
- : UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }),
];
await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({
queryKey: key })));
diff --git a/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
b/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
index 545168280e1..0d7e8f266c3 100644
--- a/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
@@ -25,7 +25,6 @@ import {
useTaskInstanceServiceGetTaskInstancesKey,
useDagRunServiceGetDagRunsKey,
UseDagRunServiceGetDagRunKeyFn,
- UseGridServiceGetGridTiSummariesKeyFn,
useTaskInstanceServiceGetHitlDetailsKey,
} from "openapi/queries";
import { toaster } from "src/components/ui";
@@ -59,7 +58,6 @@ export const useDeleteTaskInstance = ({
const onSuccess = async () => {
const queryKeys = [
UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
- UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{
dagId, runId: dagRunId }]),
[useDagRunServiceGetDagRunsKey],
[useTaskInstanceServiceGetTaskInstancesKey],
[useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId, mapIndex,
taskId }],
diff --git a/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
index 6b75402ab6c..28de4bc473d 100644
--- a/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
@@ -16,45 +16,111 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { useGridServiceGetGridTiSummaries } from "openapi/queries";
-import type { TaskInstanceState } from "openapi/requests";
+import { useEffect, useState } from "react";
+
+import type { GridTISummaries, TaskInstanceState } from "openapi/requests";
+import { OpenAPI } from "openapi/requests/core/OpenAPI";
import { isStatePending, useAutoRefresh } from "src/utils";
-export const useGridTiSummaries = ({
+/**
+ * Streams TI summaries for all grid runs over a single HTTP connection
(NDJSON).
+ *
+ * The server emits one JSON line per Dag run as soon as that run's task
+ * instances have been computed, so the grid renders each column progressively
+ * rather than waiting for the entire payload. This eliminates the N+1 request
+ * pattern without loading all runs into one large query.
+ *
+ * Auto-refreshes while any run is still in a pending state.
+ */
+export const useGridTiSummariesStream = ({
dagId,
- enabled,
- isSelected,
- runId,
- state,
+ runIds,
+ states,
}: {
dagId: string;
- enabled?: boolean;
- isSelected?: boolean;
- runId: string;
- state?: TaskInstanceState | null | undefined;
+ runIds: Array<string>;
+ states?: Array<TaskInstanceState | null | undefined>;
}) => {
+ const [summariesByRunId, setSummariesByRunId] = useState<Map<string,
GridTISummaries>>(new Map());
+ const [refreshTick, setRefreshTick] = useState(0);
+
const baseRefetchInterval = useAutoRefresh({ dagId });
- const slowRefreshMultiplier = 5;
- const refetchInterval =
- typeof baseRefetchInterval === "number"
- ? baseRefetchInterval * (isSelected ? 1 : slowRefreshMultiplier)
- : baseRefetchInterval;
-
- const { data: gridTiSummaries, ...rest } = useGridServiceGetGridTiSummaries(
- {
- dagId,
- runId,
- },
- undefined,
- {
- enabled: Boolean(runId) && Boolean(dagId) && enabled,
- placeholderData: (prev) => prev,
- refetchInterval: (query) =>
- ((state !== undefined && isStatePending(state)) ||
- query.state.data?.task_instances.some((ti) =>
isStatePending(ti.state))) &&
- refetchInterval,
- },
- );
-
- return { data: gridTiSummaries, ...rest };
+ const hasActiveRuns = states?.some((state) => state !== undefined &&
isStatePending(state)) ?? false;
+
+ // Stable key so the effect only re-fires when the run list actually changes.
+ const runIdsKey = runIds.join(",");
+
+ // Stream (or re-stream) whenever the run list or refresh tick changes.
+ useEffect(() => {
+ if (!dagId || runIds.length === 0) {
+ return undefined;
+ }
+
+ const abortController = new AbortController();
+ let reader: ReadableStreamDefaultReader<Uint8Array> | undefined;
+
+ const fetchStream = async () => {
+ // Keep stale data visible while the new stream loads — columns update in
+ // place as fresh lines arrive rather than flashing blank.
+ try {
+ const params = new URLSearchParams(runIds.map((id) => ["run_ids",
id]));
+ const response = await
fetch(`${OpenAPI.BASE}/ui/grid/ti_summaries/${dagId}?${params}`, {
+ signal: abortController.signal,
+ });
+
+ if (!response.ok || !response.body) {
+ return;
+ }
+
+ reader = response.body.getReader();
+ const decoder = new TextDecoder();
+ let buffer = "";
+
+ // eslint-disable-next-line no-await-in-loop -- sequential reads
required; each chunk depends on the previous buffer state
+ for (let result = await reader.read(); !result.done; result = await
reader.read()) {
+ const { value } = result;
+
+ buffer += decoder.decode(value, { stream: true });
+
+ const lines = buffer.split("\n");
+
+ buffer = lines.pop() ?? "";
+
+ for (const line of lines.filter((ln) => ln.trim())) {
+ const summary = JSON.parse(line) as GridTISummaries;
+
+ setSummariesByRunId((prev) => new Map(prev).set(summary.run_id,
summary));
+ }
+ }
+ } catch (error) {
+ if ((error as Error).name !== "AbortError") {
+ // eslint-disable-next-line no-console
+ console.error("TI summaries stream error:", error);
+ }
+ }
+ };
+
+ void fetchStream();
+
+ return () => {
+ abortController.abort();
+ void reader?.cancel();
+ };
+ // eslint-disable-next-line react-hooks/exhaustive-deps -- runIdsKey
(stable join) intentionally replaces runIds array to avoid spurious re-streams
+ }, [dagId, runIdsKey, refreshTick]);
+
+ // Trigger a re-stream periodically while active runs are in flight.
+ useEffect(() => {
+ if (!hasActiveRuns || typeof baseRefetchInterval !== "number") {
+ return undefined;
+ }
+
+ const timer = setInterval(() => {
+ setRefreshTick((tick) => tick + 1);
+ }, baseRefetchInterval);
+
+ return () => clearInterval(timer);
+ }, [hasActiveRuns, baseRefetchInterval]);
+
+ return { summariesByRunId };
};
diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
b/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
index 25ea7fad646..a49fcba8592 100644
--- a/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
@@ -25,7 +25,6 @@ import {
useDagRunServicePatchDagRun,
useTaskInstanceServiceGetTaskInstancesKey,
UseGridServiceGetGridRunsKeyFn,
- UseGridServiceGetGridTiSummariesKeyFn,
} from "openapi/queries";
import { toaster } from "src/components/ui";
@@ -60,7 +59,6 @@ export const usePatchDagRun = ({
[useTaskInstanceServiceGetTaskInstancesKey, { dagId, dagRunId }],
[useClearDagRunDryRunKey, dagId],
UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
- UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{
dagId, runId: dagRunId }]),
];
await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({
queryKey: key })));
diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
b/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
index c1ae88cacb1..b94fadf8e13 100644
--- a/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
+++ b/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
@@ -25,8 +25,6 @@ import {
useTaskInstanceServiceGetTaskInstancesKey,
useTaskInstanceServicePatchTaskInstance,
UseGridServiceGetGridRunsKeyFn,
- UseGridServiceGetGridTiSummariesKeyFn,
- useGridServiceGetGridTiSummariesKey,
} from "openapi/queries";
import { toaster } from "src/components/ui";
@@ -59,19 +57,7 @@ export const usePatchTaskInstance = ({
});
};
- const onSuccessFn = async (
- _: unknown,
- variables: {
- dagId: string;
- dagRunId: string;
- requestBody: { include_future?: boolean; include_past?: boolean };
- taskId: string;
- },
- ) => {
- // Check if this patch operation affects multiple DAG runs
- const { include_future: includeFuture, include_past: includePast } =
variables.requestBody;
- const affectsMultipleRuns = includeFuture === true || includePast === true;
-
+ const onSuccessFn = async () => {
const queryKeys = [
UseTaskInstanceServiceGetTaskInstanceKeyFn({ dagId, dagRunId, taskId }),
UseTaskInstanceServiceGetMappedTaskInstanceKeyFn({ dagId, dagRunId,
mapIndex, taskId }),
@@ -79,9 +65,6 @@ export const usePatchTaskInstance = ({
[usePatchTaskInstanceDryRunKey, dagId, dagRunId, { mapIndex, taskId }],
[useClearTaskInstancesDryRunKey, dagId],
UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
- affectsMultipleRuns
- ? [useGridServiceGetGridTiSummariesKey, { dagId }]
- : UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }),
];
await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({
queryKey: key })));
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
index c9642f7c492..dbcf87149e5 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
@@ -17,6 +17,7 @@
from __future__ import annotations
+import json
from datetime import timedelta
from operator import attrgetter
@@ -566,9 +567,9 @@ class TestGetGridDataEndpoint:
# Also verify that TI summaries include a leaf entry for the removed
task
with assert_queries_count(4):
- ti_resp = test_client.get(f"/grid/ti_summaries/{DAG_ID_3}/run_3")
+ ti_resp =
test_client.get(f"/grid/ti_summaries/{DAG_ID_3}?run_ids=run_3")
assert ti_resp.status_code == 200
- ti_payload = ti_resp.json()
+ [ti_payload] = self._parse_ndjson(ti_resp)
assert ti_payload["dag_id"] == DAG_ID_3
assert ti_payload["run_id"] == "run_3"
# Find the removed task summary; it should exist even if not in
current serialized DAG structure
@@ -697,9 +698,9 @@ class TestGetGridDataEndpoint:
session.commit()
with assert_queries_count(4):
- response =
test_client.get(f"/grid/ti_summaries/{DAG_ID_4}/{run_id}")
+ response =
test_client.get(f"/grid/ti_summaries/{DAG_ID_4}?run_ids={run_id}")
assert response.status_code == 200
- actual = response.json()
+ [actual] = self._parse_ndjson(response)
expected = {
"dag_id": "test_dag_4",
"run_id": "run_4-1",
@@ -797,9 +798,9 @@ class TestGetGridDataEndpoint:
session.commit()
with assert_queries_count(4):
- response = test_client.get(f"/grid/ti_summaries/{DAG_ID}/{run_id}")
+ response =
test_client.get(f"/grid/ti_summaries/{DAG_ID}?run_ids={run_id}")
assert response.status_code == 200
- data = response.json()
+ [data] = self._parse_ndjson(response)
actual = data["task_instances"]
def sort_dict(in_dict):
@@ -1114,3 +1115,58 @@ class TestGetGridDataEndpoint:
nodes = response.json()
task_ids = sorted([node["id"] for node in nodes])
assert task_ids == expected_task_ids, description
+
+ @staticmethod
+ def _parse_ndjson(response) -> list[dict]:
+ """Parse NDJSON streaming response into a list of dicts."""
+ return [json.loads(line) for line in response.text.splitlines() if
line.strip()]
+
+ def test_grid_ti_summaries_stream_returns_all_runs(self, session,
test_client):
+ """Streaming endpoint returns one NDJSON line per requested run_id."""
+ session.commit()
+
+ run_ids = ["run_1", "run_2"]
+ response = test_client.get(f"/grid/ti_summaries/{DAG_ID}",
params={"run_ids": run_ids})
+ assert response.status_code == 200
+ assert "ndjson" in response.headers.get("content-type", "")
+
+ summaries = self._parse_ndjson(response)
+ assert len(summaries) == len(run_ids)
+ returned_run_ids = {s["run_id"] for s in summaries}
+ assert returned_run_ids == set(run_ids)
+
+ for summary in summaries:
+ assert summary["dag_id"] == DAG_ID
+ assert len(summary["task_instances"]) > 0
+
+ def test_grid_ti_summaries_stream_skips_missing_runs(self, session,
test_client):
+ """Streaming endpoint silently skips run_ids that have no task
instances."""
+ session.commit()
+
+ response = test_client.get(
+ f"/grid/ti_summaries/{DAG_ID}", params={"run_ids": ["run_1",
"nonexistent_run"]}
+ )
+ assert response.status_code == 200
+ summaries = self._parse_ndjson(response)
+ assert len(summaries) == 1
+ assert summaries[0]["run_id"] == "run_1"
+
+ def test_grid_ti_summaries_stream_empty_run_ids(self, session,
test_client):
+ """Streaming endpoint with no run_ids returns an empty body."""
+ session.commit()
+
+ response = test_client.get(f"/grid/ti_summaries/{DAG_ID}")
+ assert response.status_code == 200
+ assert self._parse_ndjson(response) == []
+
+ def test_grid_ti_summaries_stream_deduplicates_serdag_loads(self, session,
test_client):
+ """Serialized Dag is loaded once even when multiple runs share the
same version."""
+ session.commit()
+
+ run_ids = ["run_1", "run_2"]
+ # 2 auth queries + 1 serdag query shared across both runs
+ # + 1 TI query per run = 5 total (not 1 serdag per run which would be
6+).
+ with assert_queries_count(5):
+ response = test_client.get(f"/grid/ti_summaries/{DAG_ID}",
params={"run_ids": run_ids})
+ assert response.status_code == 200
+ assert len(self._parse_ndjson(response)) == len(run_ids)