This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new 9cf9c7c207a [v3-0-test] Grid view optimization (#51805) (#52718)
9cf9c7c207a is described below
commit 9cf9c7c207a211e1291366a985d9987ca42b7630
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Jul 2 11:02:44 2025 -0700
[v3-0-test] Grid view optimization (#51805) (#52718)
The headline here is, with 6k tasks in a dag, loading time for 10 runs
drops from 1.5m to < 10s in a quick local test.
I split it into smaller more purpose-specific requests that each do less.
So we have one request for just the structure, and another one for TI states
(per dag run). I also find ways to stop refreshing when there's no active dag
run (or the particular dag run is not active and its tis don't need refreshing.
I also changed the "latest dag run" query (which checks for a new run triggered
externally to be simpler dedicated endpoint. It runs every couple seconds even
when there is nothing [...]
(cherry picked from commit eaa8ca07a42a813444a67cd262a0a41f24a5d4f7)
---------
Co-authored-by: Jed Cunningham
<[email protected]>
---
.../api_fastapi/core_api/datamodels/ui/common.py | 43 ++
.../api_fastapi/core_api/datamodels/ui/grid.py | 17 +-
.../api_fastapi/core_api/openapi/_private_ui.yaml | 438 +++++++++++++++++++-
.../airflow/api_fastapi/core_api/routes/ui/grid.py | 345 +++++++++++++++-
.../api_fastapi/core_api/services/ui/grid.py | 60 +++
.../src/airflow/ui/openapi-gen/queries/common.ts | 88 ++++
.../ui/openapi-gen/queries/ensureQueryData.ts | 140 +++++++
.../src/airflow/ui/openapi-gen/queries/prefetch.ts | 140 +++++++
.../src/airflow/ui/openapi-gen/queries/queries.ts | 158 ++++++++
.../src/airflow/ui/openapi-gen/queries/suspense.ts | 158 ++++++++
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 225 ++++++++++-
.../ui/openapi-gen/requests/services.gen.ts | 141 +++++++
.../airflow/ui/openapi-gen/requests/types.gen.ts | 181 ++++++++-
.../airflow/ui/src/components/DurationChart.tsx | 17 +-
.../ui/src/layouts/Details/DagRunSelect.tsx | 23 +-
.../airflow/ui/src/layouts/Details/Grid/Bar.tsx | 19 +-
.../airflow/ui/src/layouts/Details/Grid/Grid.tsx | 69 ++--
.../layouts/Details/Grid/TaskInstancesColumn.tsx | 5 +-
.../airflow/ui/src/layouts/Details/Grid/utils.ts | 46 ++-
.../ui/src/layouts/Details/ToggleGroups.tsx | 15 +-
.../airflow/ui/src/pages/Dag/Overview/Overview.tsx | 26 +-
airflow-core/src/airflow/ui/src/pages/DagRuns.tsx | 4 +-
airflow-core/src/airflow/ui/src/queries/useGrid.ts | 72 ----
.../src/airflow/ui/src/queries/useGridRuns.ts | 44 ++
.../src/airflow/ui/src/queries/useGridStructure.ts | 49 +++
.../airflow/ui/src/queries/useGridTISummaries.ts | 46 +++
.../ui/src/queries/useRefreshOnNewDagRuns.ts | 12 +-
airflow-core/src/airflow/utils/task_group.py | 38 ++
.../api_fastapi/core_api/routes/ui/test_grid.py | 446 +++++++--------------
29 files changed, 2568 insertions(+), 497 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py
index cc4d7913b22..0f315326194 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py
@@ -17,9 +17,14 @@
from __future__ import annotations
+from datetime import datetime
from typing import Generic, Literal, TypeVar
+from pydantic import computed_field
+
from airflow.api_fastapi.core_api.base import BaseModel
+from airflow.utils.state import TaskInstanceState
+from airflow.utils.types import DagRunType
class BaseEdgeResponse(BaseModel):
@@ -52,8 +57,46 @@ E = TypeVar("E", bound=BaseEdgeResponse)
N = TypeVar("N", bound=BaseNodeResponse)
+class GridNodeResponse(BaseModel):
+ """Base Node serializer for responses."""
+
+ id: str
+ label: str
+ children: list[GridNodeResponse] | None = None
+ is_mapped: bool | None
+ setup_teardown_type: Literal["setup", "teardown"] | None = None
+
+
+class GridRunsResponse(BaseModel):
+ """Base Node serializer for responses."""
+
+ dag_id: str
+ run_id: str
+ queued_at: datetime | None
+ start_date: datetime | None
+ end_date: datetime | None
+ run_after: datetime
+ state: TaskInstanceState | None
+ run_type: DagRunType
+
+ @computed_field
+ def duration(self) -> int | None:
+ if self.start_date and self.end_date:
+ return (self.end_date - self.start_date).seconds
+ return None
+
+
class BaseGraphResponse(BaseModel, Generic[E, N]):
"""Base Graph serializer for responses."""
edges: list[E]
nodes: list[N]
+
+
+class LatestRunResponse(BaseModel):
+ """Base Node serializer for responses."""
+
+ id: int
+ dag_id: str
+ run_id: str
+ run_after: datetime
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py
index 822eb6f3e1a..48ea0ece794 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py
@@ -21,7 +21,6 @@ from datetime import datetime
from pydantic import BaseModel, Field
-from airflow.api_fastapi.core_api.datamodels.ui.structure import
StructureDataResponse
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
@@ -40,6 +39,13 @@ class GridTaskInstanceSummary(BaseModel):
note: str | None
+class LightGridTaskInstanceSummary(BaseModel):
+ """Task Instance Summary model for the Grid UI."""
+
+ task_id: str
+ state: TaskInstanceState | None
+
+
class GridDAGRunwithTIs(BaseModel):
"""DAG Run model for the Grid UI."""
@@ -57,8 +63,15 @@ class GridDAGRunwithTIs(BaseModel):
task_instances: list[GridTaskInstanceSummary]
+class GridTISummaries(BaseModel):
+ """DAG Run model for the Grid UI."""
+
+ run_id: str
+ dag_id: str
+ task_instances: list[LightGridTaskInstanceSummary]
+
+
class GridResponse(BaseModel):
"""Response model for the Grid UI."""
dag_runs: list[GridDAGRunwithTIs]
- structure: StructureDataResponse
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index e18ef75ed02..5814f7e4de8 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -589,6 +589,293 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /ui/grid/structure/{dag_id}:
+ get:
+ tags:
+ - Grid
+ summary: Get Dag Structure
+ description: Return dag structure for grid view.
+ operationId: get_dag_structure
+ security:
+ - OAuth2PasswordBearer: []
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: offset
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: 0
+ default: 0
+ title: Offset
+ - name: limit
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: 0
+ default: 50
+ title: Limit
+ - name: order_by
+ in: query
+ required: false
+ schema:
+ type: string
+ default: id
+ title: Order By
+ - name: run_after_gte
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Run After Gte
+ - name: run_after_lte
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Run After Lte
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ $ref: '#/components/schemas/GridNodeResponse'
+ title: Response Get Dag Structure
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ /ui/grid/runs/{dag_id}:
+ get:
+ tags:
+ - Grid
+ summary: Get Grid Runs
+ description: Get info about a run for the grid.
+ operationId: get_grid_runs
+ security:
+ - OAuth2PasswordBearer: []
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: offset
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: 0
+ default: 0
+ title: Offset
+ - name: limit
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: 0
+ default: 50
+ title: Limit
+ - name: order_by
+ in: query
+ required: false
+ schema:
+ type: string
+ default: id
+ title: Order By
+ - name: run_after_gte
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Run After Gte
+ - name: run_after_lte
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Run After Lte
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ $ref: '#/components/schemas/GridRunsResponse'
+ title: Response Get Grid Runs
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ /ui/grid/ti_summaries/{dag_id}/{run_id}:
+ get:
+ tags:
+ - Grid
+ summary: Get Grid Ti Summaries
+ description: 'Get states for TIs / "groups" of TIs.
+
+
+ Essentially this is to know what color to put in the squares in the
grid.
+
+
+ The tricky part here is that we aggregate the state for groups and
mapped
+ tasks.
+
+
+ We don''t add all the TIs for mapped TIs -- we only add one entry for
the
+ mapped task and
+
+ its state is an aggregate of its TI states.
+
+
+ And for task groups, we add a "task" for that which is not really a
task but
+ is just
+
+ an entry that represents the group (so that we can show a filled in
box when
+ the group
+
+ is not expanded) and its state is an agg of those within it.'
+ operationId: get_grid_ti_summaries
+ security:
+ - OAuth2PasswordBearer: []
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Run Id
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/GridTISummaries'
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ /ui/grid/latest_run/{dag_id}:
+ get:
+ tags:
+ - Grid
+ summary: Get Latest Run
+ description: 'Get information about the latest dag run by run_after.
+
+
+ This is used by the UI to figure out if it needs to rerun queries and
resume
+ auto refresh.'
+ operationId: get_latest_run
+ security:
+ - OAuth2PasswordBearer: []
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ anyOf:
+ - $ref: '#/components/schemas/LatestRunResponse'
+ - type: 'null'
+ title: Response Get Latest Run
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
components:
schemas:
BackfillCollectionResponse:
@@ -1410,6 +1697,41 @@ components:
- task_instances
title: GridDAGRunwithTIs
description: DAG Run model for the Grid UI.
+ GridNodeResponse:
+ properties:
+ id:
+ type: string
+ title: Id
+ label:
+ type: string
+ title: Label
+ children:
+ anyOf:
+ - items:
+ $ref: '#/components/schemas/GridNodeResponse'
+ type: array
+ - type: 'null'
+ title: Children
+ is_mapped:
+ anyOf:
+ - type: boolean
+ - type: 'null'
+ title: Is Mapped
+ setup_teardown_type:
+ anyOf:
+ - type: string
+ enum:
+ - setup
+ - teardown
+ - type: 'null'
+ title: Setup Teardown Type
+ type: object
+ required:
+ - id
+ - label
+ - is_mapped
+ title: GridNodeResponse
+ description: Base Node serializer for responses.
GridResponse:
properties:
dag_runs:
@@ -1417,14 +1739,86 @@ components:
$ref: '#/components/schemas/GridDAGRunwithTIs'
type: array
title: Dag Runs
- structure:
- $ref: '#/components/schemas/StructureDataResponse'
type: object
required:
- dag_runs
- - structure
title: GridResponse
description: Response model for the Grid UI.
+ GridRunsResponse:
+ properties:
+ dag_id:
+ type: string
+ title: Dag Id
+ run_id:
+ type: string
+ title: Run Id
+ queued_at:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Queued At
+ start_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Start Date
+ end_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: End Date
+ run_after:
+ type: string
+ format: date-time
+ title: Run After
+ state:
+ anyOf:
+ - $ref: '#/components/schemas/TaskInstanceState'
+ - type: 'null'
+ run_type:
+ $ref: '#/components/schemas/DagRunType'
+ duration:
+ anyOf:
+ - type: integer
+ - type: 'null'
+ title: Duration
+ readOnly: true
+ type: object
+ required:
+ - dag_id
+ - run_id
+ - queued_at
+ - start_date
+ - end_date
+ - run_after
+ - state
+ - run_type
+ - duration
+ title: GridRunsResponse
+ description: Base Node serializer for responses.
+ GridTISummaries:
+ properties:
+ run_id:
+ type: string
+ title: Run Id
+ dag_id:
+ type: string
+ title: Dag Id
+ task_instances:
+ items:
+ $ref: '#/components/schemas/LightGridTaskInstanceSummary'
+ type: array
+ title: Task Instances
+ type: object
+ required:
+ - run_id
+ - dag_id
+ - task_instances
+ title: GridTISummaries
+ description: DAG Run model for the Grid UI.
GridTaskInstanceSummary:
properties:
task_id:
@@ -1520,6 +1914,44 @@ components:
- task_instance_states
title: HistoricalMetricDataResponse
description: Historical Metric Data serializer for responses.
+ LatestRunResponse:
+ properties:
+ id:
+ type: integer
+ title: Id
+ dag_id:
+ type: string
+ title: Dag Id
+ run_id:
+ type: string
+ title: Run Id
+ run_after:
+ type: string
+ format: date-time
+ title: Run After
+ type: object
+ required:
+ - id
+ - dag_id
+ - run_id
+ - run_after
+ title: LatestRunResponse
+ description: Base Node serializer for responses.
+ LightGridTaskInstanceSummary:
+ properties:
+ task_id:
+ type: string
+ title: Task Id
+ state:
+ anyOf:
+ - $ref: '#/components/schemas/TaskInstanceState'
+ - type: 'null'
+ type: object
+ required:
+ - task_id
+ - state
+ title: LightGridTaskInstanceSummary
+ description: Task Instance Summary model for the Grid UI.
MenuItem:
type: string
enum:
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index 09409335c2c..ce1a582a511 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -42,23 +42,35 @@ from airflow.api_fastapi.common.parameters import (
datetime_range_filter_factory,
)
from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.common import (
+ GridNodeResponse,
+ GridRunsResponse,
+ LatestRunResponse,
+)
from airflow.api_fastapi.core_api.datamodels.ui.grid import (
GridDAGRunwithTIs,
GridResponse,
+ GridTISummaries,
)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import requires_access_dag
from airflow.api_fastapi.core_api.services.ui.grid import (
+ _find_aggregates,
+ _merge_node_dicts,
fill_task_instance_summaries,
get_child_task_map,
- get_combined_structure,
- get_structure_from_dag,
get_task_group_map,
)
-from airflow.models import DagRun, TaskInstance
from airflow.models.dag_version import DagVersion
+from airflow.models.dagrun import DagRun
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.utils.state import TaskInstanceState
+from airflow.utils.task_group import (
+ get_task_group_children_getter,
+ task_group_to_dict_grid,
+)
log = structlog.get_logger(logger_name=__name__)
grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
@@ -71,6 +83,7 @@ grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
Depends(requires_access_dag(method="GET",
access_entity=DagAccessEntity.TASK_INSTANCE)),
Depends(requires_access_dag(method="GET",
access_entity=DagAccessEntity.RUN)),
],
+ response_model_exclude_none=True,
)
def grid_data(
dag_id: str,
@@ -124,11 +137,9 @@ def grid_data(
)
dag_runs = list(session.scalars(dag_runs_select_filter).unique())
-
# Check if there are any DAG Runs with given criteria to eliminate
unnecessary queries/errors
if not dag_runs:
- structure = get_structure_from_dag(dag=dag)
- return GridResponse(dag_runs=[], structure=structure)
+ return GridResponse(dag_runs=[])
# Retrieve, sort and encode the Task Instances
tis_of_dag_runs, _ = paginated_select(
@@ -257,8 +268,324 @@ def grid_data(
)
for dag_run in dag_runs
]
+ return GridResponse(dag_runs=grid_dag_runs)
+
+
+def _get_latest_serdag(dag_id, session):
+ serdag = session.scalar(
+ select(SerializedDagModel)
+ .where(
+ SerializedDagModel.dag_id == dag_id,
+ )
+ .order_by(SerializedDagModel.id.desc())
+ .limit(1)
+ )
+ if not serdag:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ f"Dag with id {dag_id} was not found",
+ )
+ return serdag
+
+
+def _get_serdag(dag_id, dag_version_id, session) -> SerializedDagModel | None:
+ # this is a simplification - we account for structure based on the first
task
+ version = session.scalar(select(DagVersion).where(DagVersion.id ==
dag_version_id))
+ if not version:
+ version = session.scalar(
+ select(DagVersion)
+ .where(
+ DagVersion.dag_id == dag_id,
+ )
+ .order_by(DagVersion.id) # ascending cus this is mostly for
pre-3.0 upgrade
+ .limit(1)
+ )
+ if not (serdag := version.serialized_dag):
+ log.error(
+ "No serialized dag found",
+ dag_id=dag_id,
+ version_id=version.id,
+ version_number=version.version_number,
+ )
+ return serdag
+
+
+@grid_router.get(
+ "/structure/{dag_id}",
+ responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST,
status.HTTP_404_NOT_FOUND]),
+ dependencies=[
+ Depends(requires_access_dag(method="GET",
access_entity=DagAccessEntity.TASK_INSTANCE)),
+ Depends(requires_access_dag(method="GET",
access_entity=DagAccessEntity.RUN)),
+ ],
+ response_model_exclude_none=True,
+)
+def get_dag_structure(
+ dag_id: str,
+ session: SessionDep,
+ offset: QueryOffset,
+ limit: QueryLimit,
+ order_by: Annotated[
+ SortParam,
+ Depends(SortParam(["run_after", "logical_date", "start_date",
"end_date"], DagRun).dynamic_depends()),
+ ],
+ run_after: Annotated[RangeFilter,
Depends(datetime_range_filter_factory("run_after", DagRun))],
+) -> list[GridNodeResponse]:
+ """Return dag structure for grid view."""
+ latest_serdag = _get_latest_serdag(dag_id, session)
+ latest_dag = latest_serdag.dag
- flat_tis = itertools.chain.from_iterable(tis_by_run_id.values())
- structure = get_combined_structure(task_instances=flat_tis,
session=session)
+ # Retrieve, sort the previous DAG Runs
+ base_query = select(DagRun.id).where(DagRun.dag_id == dag_id)
+ # This comparison is to fall back to DAG timetable when no order_by is
provided
+ if order_by.value == order_by.get_primary_key_string():
+ ordering = list(latest_dag.timetable.run_ordering)
+ order_by = SortParam(
+ allowed_attrs=ordering,
+ model=DagRun,
+ ).set_value(ordering[0])
+ dag_runs_select_filter, _ = paginated_select(
+ statement=base_query,
+ order_by=order_by,
+ offset=offset,
+ filters=[run_after],
+ limit=limit,
+ )
+ run_ids = list(session.scalars(dag_runs_select_filter))
+
+ task_group_sort = get_task_group_children_getter()
+ if not run_ids:
+ nodes = [task_group_to_dict_grid(x) for x in
task_group_sort(latest_dag.task_group)]
+ return nodes
- return GridResponse(dag_runs=grid_dag_runs, structure=structure)
+ serdags = session.scalars(
+ select(SerializedDagModel).where(
+ SerializedDagModel.dag_version_id.in_(
+ select(TaskInstance.dag_version_id)
+ .join(TaskInstance.dag_run)
+ .where(
+ DagRun.id.in_(run_ids),
+ SerializedDagModel.id != latest_serdag.id,
+ )
+ )
+ )
+ )
+ merged_nodes: list[GridNodeResponse] = []
+ dags = [latest_dag]
+ for serdag in serdags:
+ if serdag:
+ dags.append(serdag.dag)
+ for dag in dags:
+ nodes = [task_group_to_dict_grid(x) for x in
task_group_sort(dag.task_group)]
+ _merge_node_dicts(merged_nodes, nodes)
+
+ return merged_nodes
+
+
+@grid_router.get(
+ "/runs/{dag_id}",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+ dependencies=[
+ Depends(
+ requires_access_dag(
+ method="GET",
+ access_entity=DagAccessEntity.TASK_INSTANCE,
+ )
+ ),
+ Depends(
+ requires_access_dag(
+ method="GET",
+ access_entity=DagAccessEntity.RUN,
+ )
+ ),
+ ],
+ response_model_exclude_none=True,
+)
+def get_grid_runs(
+ dag_id: str,
+ session: SessionDep,
+ offset: QueryOffset,
+ limit: QueryLimit,
+ order_by: Annotated[
+ SortParam,
+ Depends(
+ SortParam(
+ [
+ "run_after",
+ "logical_date",
+ "start_date",
+ "end_date",
+ ],
+ DagRun,
+ ).dynamic_depends()
+ ),
+ ],
+ run_after: Annotated[RangeFilter,
Depends(datetime_range_filter_factory("run_after", DagRun))],
+) -> list[GridRunsResponse]:
+ """Get info about a run for the grid."""
+ # Retrieve, sort the previous DAG Runs
+ base_query = select(
+ DagRun.dag_id,
+ DagRun.run_id,
+ DagRun.queued_at,
+ DagRun.start_date,
+ DagRun.end_date,
+ DagRun.run_after,
+ DagRun.state,
+ DagRun.run_type,
+ ).where(DagRun.dag_id == dag_id)
+
+ # This comparison is to fall back to DAG timetable when no order_by is
provided
+ if order_by.value == order_by.get_primary_key_string():
+ latest_serdag = _get_latest_serdag(dag_id, session)
+ latest_dag = latest_serdag.dag
+ ordering = list(latest_dag.timetable.run_ordering)
+ order_by = SortParam(
+ allowed_attrs=ordering,
+ model=DagRun,
+ ).set_value(ordering[0])
+ dag_runs_select_filter, _ = paginated_select(
+ statement=base_query,
+ order_by=order_by,
+ offset=offset,
+ filters=[run_after],
+ limit=limit,
+ )
+ return session.execute(dag_runs_select_filter)
+
+
+@grid_router.get(
+ "/ti_summaries/{dag_id}/{run_id}",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+ dependencies=[
+ Depends(
+ requires_access_dag(
+ method="GET",
+ access_entity=DagAccessEntity.TASK_INSTANCE,
+ )
+ ),
+ Depends(
+ requires_access_dag(
+ method="GET",
+ access_entity=DagAccessEntity.RUN,
+ )
+ ),
+ ],
+ response_model_exclude_none=True,
+)
+def get_grid_ti_summaries(
+ dag_id: str,
+ run_id: str,
+ session: SessionDep,
+) -> GridTISummaries:
+ """
+ Get states for TIs / "groups" of TIs.
+
+ Essentially this is to know what color to put in the squares in the grid.
+
+ The tricky part here is that we aggregate the state for groups and mapped
tasks.
+
+ We don't add all the TIs for mapped TIs -- we only add one entry for the
mapped task and
+ its state is an aggregate of its TI states.
+
+ And for task groups, we add a "task" for that which is not really a task
but is just
+ an entry that represents the group (so that we can show a filled in box
when the group
+ is not expanded) and its state is an agg of those within it.
+ """
+ tis_of_dag_runs, _ = paginated_select(
+ statement=(
+ select(
+ TaskInstance.task_id,
+ TaskInstance.state,
+ TaskInstance.dag_version_id,
+ )
+ .where(TaskInstance.dag_id == dag_id)
+ .where(
+ TaskInstance.run_id == run_id,
+ )
+ ),
+ filters=[],
+ order_by=SortParam(allowed_attrs=["task_id", "run_id"],
model=TaskInstance).set_value("task_id"),
+ limit=None,
+ return_total_entries=False,
+ )
+ task_instances = list(session.execute(tis_of_dag_runs))
+ task_id_states = collections.defaultdict(list)
+ for ti in task_instances:
+ task_id_states[ti.task_id].append(ti.state)
+
+ serdag = _get_serdag(
+ dag_id=dag_id,
+ dag_version_id=task_instances[0].dag_version_id,
+ session=session,
+ )
+ if not serdag:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
+ tis = list(
+ _find_aggregates(
+ node=serdag.dag.task_group,
+ parent_node=None,
+ ti_states=task_id_states,
+ )
+ )
+
+ return { # type: ignore[return-value]
+ "run_id": run_id,
+ "dag_id": dag_id,
+ "task_instances": list(tis),
+ }
+
+
+@grid_router.get(
+ "/latest_run/{dag_id}",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+ dependencies=[
+ Depends(
+ requires_access_dag(
+ method="GET",
+ access_entity=DagAccessEntity.TASK_INSTANCE,
+ )
+ ),
+ Depends(
+ requires_access_dag(
+ method="GET",
+ access_entity=DagAccessEntity.RUN,
+ )
+ ),
+ ],
+ response_model_exclude_none=True,
+)
+def get_latest_run(
+ dag_id: str,
+ session: SessionDep,
+) -> LatestRunResponse | None:
+ """
+ Get information about the latest dag run by run_after.
+
+ This is used by the UI to figure out if it needs to rerun queries and
resume auto refresh.
+ """
+ return session.execute(
+ select(
+ DagRun.id,
+ DagRun.dag_id,
+ DagRun.run_id,
+ DagRun.run_after,
+ )
+ .where(DagRun.dag_id == dag_id)
+ .order_by(DagRun.run_after.desc())
+ .limit(1)
+ ).one_or_none()
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
index 346676e14cd..a69cafb7bbb 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
@@ -18,6 +18,8 @@
from __future__ import annotations
import contextlib
+from collections import Counter
+from collections.abc import Iterable
from uuid import UUID
import structlog
@@ -309,3 +311,61 @@ def _get_node_by_id(nodes, node_id):
if node["id"] == node_id:
return node
return {}
+
+
+def _is_task_node_mapped_task_group(task_node: BaseOperator | MappedTaskGroup
| TaskMap | None) -> bool:
+ """Check if the Task Node is a Mapped Task Group."""
+ return type(task_node) is MappedTaskGroup
+
+
+def agg_state(states):
+ states = Counter(states)
+ for state in state_priority:
+ if state in states:
+ return state
+ return "no_status"
+
+
+def _find_aggregates(
+ node: TaskGroup | BaseOperator | MappedTaskGroup | TaskMap,
+ parent_node: TaskGroup | BaseOperator | MappedTaskGroup | TaskMap | None,
+ ti_states: dict[str, list[str]],
+) -> Iterable[dict]:
+ """Recursively fill the Task Group Map."""
+ node_id = node.node_id
+ parent_id = parent_node.node_id if parent_node else None
+
+ if node is None:
+ return
+
+ if isinstance(node, MappedOperator):
+ yield {
+ "task_id": node_id,
+ "type": "mapped_task",
+ "parent_id": parent_id,
+ "state": agg_state(ti_states[node_id]),
+ }
+
+ return
+ if isinstance(node, TaskGroup):
+ states = []
+ for child in get_task_group_children_getter()(node):
+ for child_node in _find_aggregates(node=child, parent_node=node,
ti_states=ti_states):
+ states.append(child_node["state"])
+ yield child_node
+ if node_id:
+ yield {
+ "task_id": node_id,
+ "type": "group",
+ "parent_id": parent_id,
+ "state": agg_state(states),
+ }
+ return
+ if isinstance(node, BaseOperator):
+ yield {
+ "task_id": node_id,
+ "type": "task",
+ "parent_id": parent_id,
+ "state": agg_state(ti_states[node_id]),
+ }
+ return
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index a7efa4868c8..fb8e969aad5 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -1806,6 +1806,94 @@ export const UseGridServiceGridDataKeyFn = (
},
]),
];
+export type GridServiceGetDagStructureDefaultResponse = Awaited<
+ ReturnType<typeof GridService.getDagStructure>
+>;
+export type GridServiceGetDagStructureQueryResult<
+ TData = GridServiceGetDagStructureDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useGridServiceGetDagStructureKey = "GridServiceGetDagStructure";
+export const UseGridServiceGetDagStructureKeyFn = (
+ {
+ dagId,
+ limit,
+ offset,
+ orderBy,
+ runAfterGte,
+ runAfterLte,
+ }: {
+ dagId: string;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ runAfterGte?: string;
+ runAfterLte?: string;
+ },
+ queryKey?: Array<unknown>,
+) => [
+ useGridServiceGetDagStructureKey,
+ ...(queryKey ?? [{ dagId, limit, offset, orderBy, runAfterGte, runAfterLte
}]),
+];
+export type GridServiceGetGridRunsDefaultResponse = Awaited<ReturnType<typeof
GridService.getGridRuns>>;
+export type GridServiceGetGridRunsQueryResult<
+ TData = GridServiceGetGridRunsDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useGridServiceGetGridRunsKey = "GridServiceGetGridRuns";
+export const UseGridServiceGetGridRunsKeyFn = (
+ {
+ dagId,
+ limit,
+ offset,
+ orderBy,
+ runAfterGte,
+ runAfterLte,
+ }: {
+ dagId: string;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ runAfterGte?: string;
+ runAfterLte?: string;
+ },
+ queryKey?: Array<unknown>,
+) => [
+ useGridServiceGetGridRunsKey,
+ ...(queryKey ?? [{ dagId, limit, offset, orderBy, runAfterGte, runAfterLte
}]),
+];
+export type GridServiceGetGridTiSummariesDefaultResponse = Awaited<
+ ReturnType<typeof GridService.getGridTiSummaries>
+>;
+export type GridServiceGetGridTiSummariesQueryResult<
+ TData = GridServiceGetGridTiSummariesDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useGridServiceGetGridTiSummariesKey =
"GridServiceGetGridTiSummaries";
+export const UseGridServiceGetGridTiSummariesKeyFn = (
+ {
+ dagId,
+ runId,
+ }: {
+ dagId: string;
+ runId: string;
+ },
+ queryKey?: Array<unknown>,
+) => [useGridServiceGetGridTiSummariesKey, ...(queryKey ?? [{ dagId, runId
}])];
+export type GridServiceGetLatestRunDefaultResponse = Awaited<ReturnType<typeof
GridService.getLatestRun>>;
+export type GridServiceGetLatestRunQueryResult<
+ TData = GridServiceGetLatestRunDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useGridServiceGetLatestRunKey = "GridServiceGetLatestRun";
+export const UseGridServiceGetLatestRunKeyFn = (
+ {
+ dagId,
+ }: {
+ dagId: string;
+ },
+ queryKey?: Array<unknown>,
+) => [useGridServiceGetLatestRunKey, ...(queryKey ?? [{ dagId }])];
export type AssetServiceCreateAssetEventMutationResult = Awaited<
ReturnType<typeof AssetService.createAssetEvent>
>;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
index f02690c160b..ac664436994 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
@@ -2527,3 +2527,143 @@ export const ensureUseGridServiceGridDataData = (
state,
}),
});
+/**
+ * Get Dag Structure
+ * Return dag structure for grid view.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.offset
+ * @param data.limit
+ * @param data.orderBy
+ * @param data.runAfterGte
+ * @param data.runAfterLte
+ * @returns GridNodeResponse Successful Response
+ * @throws ApiError
+ */
+export const ensureUseGridServiceGetDagStructureData = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ limit,
+ offset,
+ orderBy,
+ runAfterGte,
+ runAfterLte,
+ }: {
+ dagId: string;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ runAfterGte?: string;
+ runAfterLte?: string;
+ },
+) =>
+ queryClient.ensureQueryData({
+ queryKey: Common.UseGridServiceGetDagStructureKeyFn({
+ dagId,
+ limit,
+ offset,
+ orderBy,
+ runAfterGte,
+ runAfterLte,
+ }),
+ queryFn: () => GridService.getDagStructure({ dagId, limit, offset,
orderBy, runAfterGte, runAfterLte }),
+ });
+/**
+ * Get Grid Runs
+ * Get info about a run for the grid.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.offset
+ * @param data.limit
+ * @param data.orderBy
+ * @param data.runAfterGte
+ * @param data.runAfterLte
+ * @returns GridRunsResponse Successful Response
+ * @throws ApiError
+ */
+export const ensureUseGridServiceGetGridRunsData = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ limit,
+ offset,
+ orderBy,
+ runAfterGte,
+ runAfterLte,
+ }: {
+ dagId: string;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ runAfterGte?: string;
+ runAfterLte?: string;
+ },
+) =>
+ queryClient.ensureQueryData({
+ queryKey: Common.UseGridServiceGetGridRunsKeyFn({
+ dagId,
+ limit,
+ offset,
+ orderBy,
+ runAfterGte,
+ runAfterLte,
+ }),
+ queryFn: () => GridService.getGridRuns({ dagId, limit, offset, orderBy,
runAfterGte, runAfterLte }),
+ });
+/**
+ * Get Grid Ti Summaries
+ * Get states for TIs / "groups" of TIs.
+ *
+ * Essentially this is to know what color to put in the squares in the grid.
+ *
+ * The tricky part here is that we aggregate the state for groups and mapped
tasks.
+ *
+ * We don't add all the TIs for mapped TIs -- we only add one entry for the
mapped task and
+ * its state is an aggregate of its TI states.
+ *
+ * And for task groups, we add a "task" for that which is not really a task
but is just
+ * an entry that represents the group (so that we can show a filled in box
when the group
+ * is not expanded) and its state is an agg of those within it.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.runId
+ * @returns GridTISummaries Successful Response
+ * @throws ApiError
+ */
+export const ensureUseGridServiceGetGridTiSummariesData = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ runId,
+ }: {
+ dagId: string;
+ runId: string;
+ },
+) =>
+ queryClient.ensureQueryData({
+ queryKey: Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }),
+ queryFn: () => GridService.getGridTiSummaries({ dagId, runId }),
+ });
+/**
+ * Get Latest Run
+ * Get information about the latest dag run by run_after.
+ *
+ * This is used by the UI to figure out if it needs to rerun queries and
resume auto refresh.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+export const ensureUseGridServiceGetLatestRunData = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ }: {
+ dagId: string;
+ },
+) =>
+ queryClient.ensureQueryData({
+ queryKey: Common.UseGridServiceGetLatestRunKeyFn({ dagId }),
+ queryFn: () => GridService.getLatestRun({ dagId }),
+ });
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
index b9039a10f37..dfb13f3e0a1 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -2527,3 +2527,143 @@ export const prefetchUseGridServiceGridData = (
state,
}),
});
+/**
+ * Get Dag Structure
+ * Return dag structure for grid view.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.offset
+ * @param data.limit
+ * @param data.orderBy
+ * @param data.runAfterGte
+ * @param data.runAfterLte
+ * @returns GridNodeResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseGridServiceGetDagStructure = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ limit,
+ offset,
+ orderBy,
+ runAfterGte,
+ runAfterLte,
+ }: {
+ dagId: string;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ runAfterGte?: string;
+ runAfterLte?: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseGridServiceGetDagStructureKeyFn({
+ dagId,
+ limit,
+ offset,
+ orderBy,
+ runAfterGte,
+ runAfterLte,
+ }),
+ queryFn: () => GridService.getDagStructure({ dagId, limit, offset,
orderBy, runAfterGte, runAfterLte }),
+ });
+/**
+ * Get Grid Runs
+ * Get info about a run for the grid.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.offset
+ * @param data.limit
+ * @param data.orderBy
+ * @param data.runAfterGte
+ * @param data.runAfterLte
+ * @returns GridRunsResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseGridServiceGetGridRuns = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ limit,
+ offset,
+ orderBy,
+ runAfterGte,
+ runAfterLte,
+ }: {
+ dagId: string;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ runAfterGte?: string;
+ runAfterLte?: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseGridServiceGetGridRunsKeyFn({
+ dagId,
+ limit,
+ offset,
+ orderBy,
+ runAfterGte,
+ runAfterLte,
+ }),
+ queryFn: () => GridService.getGridRuns({ dagId, limit, offset, orderBy,
runAfterGte, runAfterLte }),
+ });
+/**
+ * Get Grid Ti Summaries
+ * Get states for TIs / "groups" of TIs.
+ *
+ * Essentially this is to know what color to put in the squares in the grid.
+ *
+ * The tricky part here is that we aggregate the state for groups and mapped
tasks.
+ *
+ * We don't add all the TIs for mapped TIs -- we only add one entry for the
mapped task and
+ * its state is an aggregate of its TI states.
+ *
+ * And for task groups, we add a "task" for that which is not really a task
but is just
+ * an entry that represents the group (so that we can show a filled in box
when the group
+ * is not expanded) and its state is an agg of those within it.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.runId
+ * @returns GridTISummaries Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseGridServiceGetGridTiSummaries = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ runId,
+ }: {
+ dagId: string;
+ runId: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }),
+ queryFn: () => GridService.getGridTiSummaries({ dagId, runId }),
+ });
+/**
+ * Get Latest Run
+ * Get information about the latest dag run by run_after.
+ *
+ * This is used by the UI to figure out if it needs to rerun queries and
resume auto refresh.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseGridServiceGetLatestRun = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ }: {
+ dagId: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseGridServiceGetLatestRunKeyFn({ dagId }),
+ queryFn: () => GridService.getLatestRun({ dagId }),
+ });
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 30b49d52aa3..43f56ec550d 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -3018,6 +3018,164 @@ export const useGridServiceGridData = <
}) as TData,
...options,
});
+/**
+ * Get Dag Structure
+ * Return dag structure for grid view.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.offset
+ * @param data.limit
+ * @param data.orderBy
+ * @param data.runAfterGte
+ * @param data.runAfterLte
+ * @returns GridNodeResponse Successful Response
+ * @throws ApiError
+ */
+export const useGridServiceGetDagStructure = <
+ TData = Common.GridServiceGetDagStructureDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ limit,
+ offset,
+ orderBy,
+ runAfterGte,
+ runAfterLte,
+ }: {
+ dagId: string;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ runAfterGte?: string;
+ runAfterLte?: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseGridServiceGetDagStructureKeyFn(
+ { dagId, limit, offset, orderBy, runAfterGte, runAfterLte },
+ queryKey,
+ ),
+ queryFn: () =>
+ GridService.getDagStructure({ dagId, limit, offset, orderBy,
runAfterGte, runAfterLte }) as TData,
+ ...options,
+ });
+/**
+ * Get Grid Runs
+ * Get info about a run for the grid.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.offset
+ * @param data.limit
+ * @param data.orderBy
+ * @param data.runAfterGte
+ * @param data.runAfterLte
+ * @returns GridRunsResponse Successful Response
+ * @throws ApiError
+ */
+export const useGridServiceGetGridRuns = <
+ TData = Common.GridServiceGetGridRunsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ limit,
+ offset,
+ orderBy,
+ runAfterGte,
+ runAfterLte,
+ }: {
+ dagId: string;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ runAfterGte?: string;
+ runAfterLte?: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseGridServiceGetGridRunsKeyFn(
+ { dagId, limit, offset, orderBy, runAfterGte, runAfterLte },
+ queryKey,
+ ),
+ queryFn: () =>
+ GridService.getGridRuns({ dagId, limit, offset, orderBy, runAfterGte,
runAfterLte }) as TData,
+ ...options,
+ });
+/**
+ * Get Grid Ti Summaries
+ * Get states for TIs / "groups" of TIs.
+ *
+ * Essentially this is to know what color to put in the squares in the grid.
+ *
+ * The tricky part here is that we aggregate the state for groups and mapped
tasks.
+ *
+ * We don't add all the TIs for mapped TIs -- we only add one entry for the
mapped task and
+ * its state is an aggregate of its TI states.
+ *
+ * And for task groups, we add a "task" for that which is not really a task
but is just
+ * an entry that represents the group (so that we can show a filled in box
when the group
+ * is not expanded) and its state is an agg of those within it.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.runId
+ * @returns GridTISummaries Successful Response
+ * @throws ApiError
+ */
+export const useGridServiceGetGridTiSummaries = <
+ TData = Common.GridServiceGetGridTiSummariesDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ runId,
+ }: {
+ dagId: string;
+ runId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId },
queryKey),
+ queryFn: () => GridService.getGridTiSummaries({ dagId, runId }) as TData,
+ ...options,
+ });
+/**
+ * Get Latest Run
+ * Get information about the latest dag run by run_after.
+ *
+ * This is used by the UI to figure out if it needs to rerun queries and
resume auto refresh.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+export const useGridServiceGetLatestRun = <
+ TData = Common.GridServiceGetLatestRunDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ }: {
+ dagId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseGridServiceGetLatestRunKeyFn({ dagId }, queryKey),
+ queryFn: () => GridService.getLatestRun({ dagId }) as TData,
+ ...options,
+ });
/**
* Create Asset Event
* Create asset events.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
index d525b0a662c..3dfcb2c1755 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
@@ -2995,3 +2995,161 @@ export const useGridServiceGridDataSuspense = <
}) as TData,
...options,
});
+/**
+ * Get Dag Structure
+ * Return dag structure for grid view.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.offset
+ * @param data.limit
+ * @param data.orderBy
+ * @param data.runAfterGte
+ * @param data.runAfterLte
+ * @returns GridNodeResponse Successful Response
+ * @throws ApiError
+ */
+export const useGridServiceGetDagStructureSuspense = <
+ TData = Common.GridServiceGetDagStructureDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ limit,
+ offset,
+ orderBy,
+ runAfterGte,
+ runAfterLte,
+ }: {
+ dagId: string;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ runAfterGte?: string;
+ runAfterLte?: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseGridServiceGetDagStructureKeyFn(
+ { dagId, limit, offset, orderBy, runAfterGte, runAfterLte },
+ queryKey,
+ ),
+ queryFn: () =>
+ GridService.getDagStructure({ dagId, limit, offset, orderBy,
runAfterGte, runAfterLte }) as TData,
+ ...options,
+ });
+/**
+ * Get Grid Runs
+ * Get info about a run for the grid.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.offset
+ * @param data.limit
+ * @param data.orderBy
+ * @param data.runAfterGte
+ * @param data.runAfterLte
+ * @returns GridRunsResponse Successful Response
+ * @throws ApiError
+ */
+export const useGridServiceGetGridRunsSuspense = <
+ TData = Common.GridServiceGetGridRunsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ limit,
+ offset,
+ orderBy,
+ runAfterGte,
+ runAfterLte,
+ }: {
+ dagId: string;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ runAfterGte?: string;
+ runAfterLte?: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseGridServiceGetGridRunsKeyFn(
+ { dagId, limit, offset, orderBy, runAfterGte, runAfterLte },
+ queryKey,
+ ),
+ queryFn: () =>
+ GridService.getGridRuns({ dagId, limit, offset, orderBy, runAfterGte,
runAfterLte }) as TData,
+ ...options,
+ });
+/**
+ * Get Grid Ti Summaries
+ * Get states for TIs / "groups" of TIs.
+ *
+ * Essentially this is to know what color to put in the squares in the grid.
+ *
+ * The tricky part here is that we aggregate the state for groups and mapped
tasks.
+ *
+ * We don't add all the TIs for mapped TIs -- we only add one entry for the
mapped task and
+ * its state is an aggregate of its TI states.
+ *
+ * And for task groups, we add a "task" for that which is not really a task
but is just
+ * an entry that represents the group (so that we can show a filled in box
when the group
+ * is not expanded) and its state is an agg of those within it.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.runId
+ * @returns GridTISummaries Successful Response
+ * @throws ApiError
+ */
+export const useGridServiceGetGridTiSummariesSuspense = <
+ TData = Common.GridServiceGetGridTiSummariesDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ runId,
+ }: {
+ dagId: string;
+ runId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId },
queryKey),
+ queryFn: () => GridService.getGridTiSummaries({ dagId, runId }) as TData,
+ ...options,
+ });
+/**
+ * Get Latest Run
+ * Get information about the latest dag run by run_after.
+ *
+ * This is used by the UI to figure out if it needs to rerun queries and
resume auto refresh.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+export const useGridServiceGetLatestRunSuspense = <
+ TData = Common.GridServiceGetLatestRunDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ }: {
+ dagId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseGridServiceGetLatestRunKeyFn({ dagId }, queryKey),
+ queryFn: () => GridService.getLatestRun({ dagId }) as TData,
+ ...options,
+ });
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 9562def60ea..ec6c47e14f9 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -6546,6 +6546,60 @@ export const $GridDAGRunwithTIs = {
description: "DAG Run model for the Grid UI.",
} as const;
+export const $GridNodeResponse = {
+ properties: {
+ id: {
+ type: "string",
+ title: "Id",
+ },
+ label: {
+ type: "string",
+ title: "Label",
+ },
+ children: {
+ anyOf: [
+ {
+ items: {
+ $ref: "#/components/schemas/GridNodeResponse",
+ },
+ type: "array",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Children",
+ },
+ is_mapped: {
+ anyOf: [
+ {
+ type: "boolean",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Is Mapped",
+ },
+ setup_teardown_type: {
+ anyOf: [
+ {
+ type: "string",
+ enum: ["setup", "teardown"],
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Setup Teardown Type",
+ },
+ },
+ type: "object",
+ required: ["id", "label", "is_mapped"],
+ title: "GridNodeResponse",
+ description: "Base Node serializer for responses.",
+} as const;
+
export const $GridResponse = {
properties: {
dag_runs: {
@@ -6555,16 +6609,130 @@ export const $GridResponse = {
type: "array",
title: "Dag Runs",
},
- structure: {
- $ref: "#/components/schemas/StructureDataResponse",
- },
},
type: "object",
- required: ["dag_runs", "structure"],
+ required: ["dag_runs"],
title: "GridResponse",
description: "Response model for the Grid UI.",
} as const;
+export const $GridRunsResponse = {
+ properties: {
+ dag_id: {
+ type: "string",
+ title: "Dag Id",
+ },
+ run_id: {
+ type: "string",
+ title: "Run Id",
+ },
+ queued_at: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Queued At",
+ },
+ start_date: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Start Date",
+ },
+ end_date: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "End Date",
+ },
+ run_after: {
+ type: "string",
+ format: "date-time",
+ title: "Run After",
+ },
+ state: {
+ anyOf: [
+ {
+ $ref: "#/components/schemas/TaskInstanceState",
+ },
+ {
+ type: "null",
+ },
+ ],
+ },
+ run_type: {
+ $ref: "#/components/schemas/DagRunType",
+ },
+ duration: {
+ anyOf: [
+ {
+ type: "integer",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Duration",
+ readOnly: true,
+ },
+ },
+ type: "object",
+ required: [
+ "dag_id",
+ "run_id",
+ "queued_at",
+ "start_date",
+ "end_date",
+ "run_after",
+ "state",
+ "run_type",
+ "duration",
+ ],
+ title: "GridRunsResponse",
+ description: "Base Node serializer for responses.",
+} as const;
+
+export const $GridTISummaries = {
+ properties: {
+ run_id: {
+ type: "string",
+ title: "Run Id",
+ },
+ dag_id: {
+ type: "string",
+ title: "Dag Id",
+ },
+ task_instances: {
+ items: {
+ $ref: "#/components/schemas/LightGridTaskInstanceSummary",
+ },
+ type: "array",
+ title: "Task Instances",
+ },
+ },
+ type: "object",
+ required: ["run_id", "dag_id", "task_instances"],
+ title: "GridTISummaries",
+ description: "DAG Run model for the Grid UI.",
+} as const;
+
export const $GridTaskInstanceSummary = {
properties: {
task_id: {
@@ -6685,6 +6853,55 @@ export const $HistoricalMetricDataResponse = {
description: "Historical Metric Data serializer for responses.",
} as const;
+export const $LatestRunResponse = {
+ properties: {
+ id: {
+ type: "integer",
+ title: "Id",
+ },
+ dag_id: {
+ type: "string",
+ title: "Dag Id",
+ },
+ run_id: {
+ type: "string",
+ title: "Run Id",
+ },
+ run_after: {
+ type: "string",
+ format: "date-time",
+ title: "Run After",
+ },
+ },
+ type: "object",
+ required: ["id", "dag_id", "run_id", "run_after"],
+ title: "LatestRunResponse",
+ description: "Base Node serializer for responses.",
+} as const;
+
+export const $LightGridTaskInstanceSummary = {
+ properties: {
+ task_id: {
+ type: "string",
+ title: "Task Id",
+ },
+ state: {
+ anyOf: [
+ {
+ $ref: "#/components/schemas/TaskInstanceState",
+ },
+ {
+ type: "null",
+ },
+ ],
+ },
+ },
+ type: "object",
+ required: ["task_id", "state"],
+ title: "LightGridTaskInstanceSummary",
+ description: "Task Instance Summary model for the Grid UI.",
+} as const;
+
export const $MenuItem = {
type: "string",
enum: [
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index cb7cf86d49c..77cd149a68c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -216,6 +216,14 @@ import type {
StructureDataResponse2,
GridDataData,
GridDataResponse,
+ GetDagStructureData,
+ GetDagStructureResponse,
+ GetGridRunsData,
+ GetGridRunsResponse,
+ GetGridTiSummariesData,
+ GetGridTiSummariesResponse,
+ GetLatestRunData,
+ GetLatestRunResponse,
} from "./types.gen";
export class AssetService {
@@ -3594,4 +3602,137 @@ export class GridService {
},
});
}
+
+ /**
+ * Get Dag Structure
+ * Return dag structure for grid view.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.offset
+ * @param data.limit
+ * @param data.orderBy
+ * @param data.runAfterGte
+ * @param data.runAfterLte
+ * @returns GridNodeResponse Successful Response
+ * @throws ApiError
+ */
+ public static getDagStructure(data: GetDagStructureData):
CancelablePromise<GetDagStructureResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/ui/grid/structure/{dag_id}",
+ path: {
+ dag_id: data.dagId,
+ },
+ query: {
+ offset: data.offset,
+ limit: data.limit,
+ order_by: data.orderBy,
+ run_after_gte: data.runAfterGte,
+ run_after_lte: data.runAfterLte,
+ },
+ errors: {
+ 400: "Bad Request",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+
+ /**
+ * Get Grid Runs
+ * Get info about a run for the grid.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.offset
+ * @param data.limit
+ * @param data.orderBy
+ * @param data.runAfterGte
+ * @param data.runAfterLte
+ * @returns GridRunsResponse Successful Response
+ * @throws ApiError
+ */
+ public static getGridRuns(data: GetGridRunsData):
CancelablePromise<GetGridRunsResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/ui/grid/runs/{dag_id}",
+ path: {
+ dag_id: data.dagId,
+ },
+ query: {
+ offset: data.offset,
+ limit: data.limit,
+ order_by: data.orderBy,
+ run_after_gte: data.runAfterGte,
+ run_after_lte: data.runAfterLte,
+ },
+ errors: {
+ 400: "Bad Request",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+
+ /**
+ * Get Grid Ti Summaries
+ * Get states for TIs / "groups" of TIs.
+ *
+ * Essentially this is to know what color to put in the squares in the grid.
+ *
+ * The tricky part here is that we aggregate the state for groups and mapped
tasks.
+ *
+ * We don't add all the TIs for mapped TIs -- we only add one entry for the
mapped task and
+ * its state is an aggregate of its TI states.
+ *
+ * And for task groups, we add a "task" for that which is not really a task
but is just
+ * an entry that represents the group (so that we can show a filled in box
when the group
+ * is not expanded) and its state is an agg of those within it.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.runId
+ * @returns GridTISummaries Successful Response
+ * @throws ApiError
+ */
+ public static getGridTiSummaries(
+ data: GetGridTiSummariesData,
+ ): CancelablePromise<GetGridTiSummariesResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/ui/grid/ti_summaries/{dag_id}/{run_id}",
+ path: {
+ dag_id: data.dagId,
+ run_id: data.runId,
+ },
+ errors: {
+ 400: "Bad Request",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+
+ /**
+ * Get Latest Run
+ * Get information about the latest dag run by run_after.
+ *
+ * This is used by the UI to figure out if it needs to rerun queries and
resume auto refresh.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+ public static getLatestRun(data: GetLatestRunData):
CancelablePromise<GetLatestRunResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/ui/grid/latest_run/{dag_id}",
+ path: {
+ dag_id: data.dagId,
+ },
+ errors: {
+ 400: "Bad Request",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
}
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index b04ce36ef78..0de72f72ce5 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1604,12 +1604,46 @@ export type GridDAGRunwithTIs = {
task_instances: Array<GridTaskInstanceSummary>;
};
+/**
+ * Base Node serializer for responses.
+ */
+export type GridNodeResponse = {
+ id: string;
+ label: string;
+ children?: Array<GridNodeResponse> | null;
+ is_mapped: boolean | null;
+ setup_teardown_type?: "setup" | "teardown" | null;
+};
+
/**
* Response model for the Grid UI.
*/
export type GridResponse = {
dag_runs: Array<GridDAGRunwithTIs>;
- structure: StructureDataResponse;
+};
+
+/**
+ * Base Node serializer for responses.
+ */
+export type GridRunsResponse = {
+ dag_id: string;
+ run_id: string;
+ queued_at: string | null;
+ start_date: string | null;
+ end_date: string | null;
+ run_after: string;
+ state: TaskInstanceState | null;
+ run_type: DagRunType;
+ readonly duration: number | null;
+};
+
+/**
+ * DAG Run model for the Grid UI.
+ */
+export type GridTISummaries = {
+ run_id: string;
+ dag_id: string;
+ task_instances: Array<LightGridTaskInstanceSummary>;
};
/**
@@ -1638,6 +1672,24 @@ export type HistoricalMetricDataResponse = {
task_instance_states: TaskInstanceStateCount;
};
+/**
+ * Base Node serializer for responses.
+ */
+export type LatestRunResponse = {
+ id: number;
+ dag_id: string;
+ run_id: string;
+ run_after: string;
+};
+
+/**
+ * Task Instance Summary model for the Grid UI.
+ */
+export type LightGridTaskInstanceSummary = {
+ task_id: string;
+ state: TaskInstanceState | null;
+};
+
/**
* Define all menu items defined in the menu.
*/
@@ -2669,6 +2721,41 @@ export type GridDataData = {
export type GridDataResponse = GridResponse;
+export type GetDagStructureData = {
+ dagId: string;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ runAfterGte?: string | null;
+ runAfterLte?: string | null;
+};
+
+export type GetDagStructureResponse = Array<GridNodeResponse>;
+
+export type GetGridRunsData = {
+ dagId: string;
+ limit?: number;
+ offset?: number;
+ orderBy?: string;
+ runAfterGte?: string | null;
+ runAfterLte?: string | null;
+};
+
+export type GetGridRunsResponse = Array<GridRunsResponse>;
+
+export type GetGridTiSummariesData = {
+ dagId: string;
+ runId: string;
+};
+
+export type GetGridTiSummariesResponse = GridTISummaries;
+
+export type GetLatestRunData = {
+ dagId: string;
+};
+
+export type GetLatestRunResponse = LatestRunResponse | null;
+
export type $OpenApiTs = {
"/api/v2/assets": {
get: {
@@ -5505,4 +5592,96 @@ export type $OpenApiTs = {
};
};
};
+ "/ui/grid/structure/{dag_id}": {
+ get: {
+ req: GetDagStructureData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: Array<GridNodeResponse>;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
+ "/ui/grid/runs/{dag_id}": {
+ get: {
+ req: GetGridRunsData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: Array<GridRunsResponse>;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
+ "/ui/grid/ti_summaries/{dag_id}/{run_id}": {
+ get: {
+ req: GetGridTiSummariesData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: GridTISummaries;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
+ "/ui/grid/latest_run/{dag_id}": {
+ get: {
+ req: GetLatestRunData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: LatestRunResponse | null;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
};
diff --git a/airflow-core/src/airflow/ui/src/components/DurationChart.tsx
b/airflow-core/src/airflow/ui/src/components/DurationChart.tsx
index a10af96f417..3e8008eaf4b 100644
--- a/airflow-core/src/airflow/ui/src/components/DurationChart.tsx
+++ b/airflow-core/src/airflow/ui/src/components/DurationChart.tsx
@@ -33,7 +33,7 @@ import dayjs from "dayjs";
import { Bar } from "react-chartjs-2";
import { useNavigate } from "react-router-dom";
-import type { TaskInstanceResponse, DAGRunResponse } from
"openapi/requests/types.gen";
+import type { TaskInstanceResponse, GridRunsResponse } from
"openapi/requests/types.gen";
import { system } from "src/theme";
import { pluralize } from "src/utils";
@@ -54,7 +54,7 @@ const average = (ctx: PartialEventContext, index: number) => {
return values === undefined ? 0 : values.reduce((initial, next) => initial +
next, 0) / values.length;
};
-type RunResponse = DAGRunResponse | TaskInstanceResponse;
+type RunResponse = GridRunsResponse | TaskInstanceResponse;
const getDuration = (start: string, end: string | null) =>
dayjs.duration(dayjs(end).diff(start)).asSeconds();
@@ -108,7 +108,7 @@ export const DurationChart = ({
data: entries.map((entry: RunResponse) => {
switch (kind) {
case "Dag Run": {
- const run = entry as DAGRunResponse;
+ const run = entry as GridRunsResponse;
return run.queued_at !== null && run.start_date !== null
&& run.queued_at < run.start_date
? Number(getDuration(run.queued_at, run.start_date))
@@ -151,18 +151,19 @@ export const DurationChart = ({
return;
}
- const entry = entries[element.index];
- const baseUrl = `/dags/${entry?.dag_id}/runs/${entry?.dag_run_id}`;
-
switch (kind) {
case "Dag Run": {
+ const entry = entries[element.index] as GridRunsResponse |
undefined;
+ const baseUrl = `/dags/${entry?.dag_id}/runs/${entry?.run_id}`;
+
navigate(baseUrl);
break;
}
case "Task Instance": {
- const taskInstance = entry as TaskInstanceResponse;
+ const entry = entries[element.index] as TaskInstanceResponse |
undefined;
+ const baseUrl =
`/dags/${entry?.dag_id}/runs/${entry?.dag_run_id}`;
- navigate(`${baseUrl}/tasks/${taskInstance.task_id}`);
+ navigate(`${baseUrl}/tasks/${entry?.task_id}`);
break;
}
default:
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/DagRunSelect.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/DagRunSelect.tsx
index 431c94b108b..a11d1005aef 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/DagRunSelect.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/DagRunSelect.tsx
@@ -20,13 +20,13 @@ import { createListCollection, type
SelectValueChangeDetails, Select } from "@ch
import { forwardRef, useMemo } from "react";
import { useNavigate, useParams } from "react-router-dom";
-import type { GridDAGRunwithTIs } from "openapi/requests/types.gen";
+import type { GridRunsResponse } from "openapi/requests/types.gen";
import { StateBadge } from "src/components/StateBadge";
import Time from "src/components/Time";
-import { useGrid } from "src/queries/useGrid";
+import { useGridRuns } from "src/queries/useGridRuns.ts";
type DagRunSelected = {
- run: GridDAGRunwithTIs;
+ run: GridRunsResponse;
value: string;
};
@@ -39,34 +39,33 @@ export const DagRunSelect = forwardRef<HTMLDivElement,
DagRunSelectProps>(({ lim
const navigate = useNavigate();
- const { data, isLoading } = useGrid(limit);
-
+ const { data: gridRuns, isLoading } = useGridRuns({ limit });
const runOptions = useMemo(
() =>
createListCollection({
- items: (data?.dag_runs ?? []).map((dr: GridDAGRunwithTIs) => ({
+ items: (gridRuns ?? []).map((dr: GridRunsResponse) => ({
run: dr,
- value: dr.dag_run_id,
+ value: dr.run_id,
})),
}),
- [data],
+ [gridRuns],
);
const selectDagRun = ({ items }: SelectValueChangeDetails<DagRunSelected>)
=> {
- const run = items.length > 0 ? `/runs/${items[0]?.run.dag_run_id}` : "";
+ const runPartialPath = items.length > 0 ? `/runs/${items[0]?.run.run_id}`
: "";
navigate({
- pathname: `/dags/${dagId}${run}/${taskId === undefined ? "" :
`tasks/${taskId}`}`,
+ pathname: `/dags/${dagId}${runPartialPath}/${taskId === undefined ? "" :
`tasks/${taskId}`}`,
});
};
- const selectedRun = (data?.dag_runs ?? []).find((dr) => dr.dag_run_id ===
runId);
+ const selectedRun = (gridRuns ?? []).find((dr) => dr.run_id === runId);
return (
<Select.Root
collection={runOptions}
data-testid="dag-run-select"
- disabled={isLoading || !data?.dag_runs}
+ disabled={isLoading || !gridRuns}
onValueChange={selectDagRun}
ref={ref}
size="sm"
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Bar.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Bar.tsx
index 90480f8f792..dbcc965c61f 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Bar.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Bar.tsx
@@ -19,27 +19,30 @@
import { Flex, Box } from "@chakra-ui/react";
import { useParams, useSearchParams } from "react-router-dom";
+import type { GridRunsResponse } from "openapi/requests";
import { RunTypeIcon } from "src/components/RunTypeIcon";
+import { useGridTiSummaries } from "src/queries/useGridTISummaries.ts";
import { GridButton } from "./GridButton";
import { TaskInstancesColumn } from "./TaskInstancesColumn";
-import type { GridTask, RunWithDuration } from "./utils";
+import type { GridTask } from "./utils";
const BAR_HEIGHT = 100;
type Props = {
readonly max: number;
readonly nodes: Array<GridTask>;
- readonly run: RunWithDuration;
+ readonly run: GridRunsResponse;
};
export const Bar = ({ max, nodes, run }: Props) => {
const { dagId = "", runId } = useParams();
const [searchParams] = useSearchParams();
- const isSelected = runId === run.dag_run_id;
+ const isSelected = runId === run.run_id;
const search = searchParams.toString();
+ const { data: gridTISummaries } = useGridTiSummaries(run);
return (
<Box
@@ -62,11 +65,11 @@ export const Bar = ({ max, nodes, run }: Props) => {
color="white"
dagId={dagId}
flexDir="column"
- height={`${(run.duration / max) * BAR_HEIGHT}px`}
+ height={`${((run.duration ?? 0) / max) * BAR_HEIGHT}px`}
justifyContent="flex-end"
label={run.run_after}
minHeight="14px"
- runId={run.dag_run_id}
+ runId={run.run_id}
searchParams={search}
state={run.state}
zIndex={1}
@@ -74,7 +77,11 @@ export const Bar = ({ max, nodes, run }: Props) => {
{run.run_type !== "scheduled" && <RunTypeIcon runType={run.run_type}
size="10px" />}
</GridButton>
</Flex>
- <TaskInstancesColumn nodes={nodes} runId={run.dag_run_id}
taskInstances={run.task_instances} />
+ <TaskInstancesColumn
+ nodes={nodes}
+ runId={run.run_id}
+ taskInstances={gridTISummaries?.task_instances ?? []}
+ />
</Box>
);
};
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
index 75fe7209dc8..d4e69ba1a70 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
@@ -19,18 +19,21 @@
import { Box, Flex, IconButton } from "@chakra-ui/react";
import dayjs from "dayjs";
import dayjsDuration from "dayjs/plugin/duration";
-import { useMemo } from "react";
+import { useEffect, useMemo, useState } from "react";
import { FiChevronsRight } from "react-icons/fi";
import { Link, useParams } from "react-router-dom";
+import type { GridRunsResponse } from "openapi/requests";
import { useOpenGroups } from "src/context/openGroups";
-import { useGrid } from "src/queries/useGrid";
+import { useGridRuns } from "src/queries/useGridRuns.ts";
+import { useGridStructure } from "src/queries/useGridStructure.ts";
+import { isStatePending } from "src/utils";
import { Bar } from "./Bar";
import { DurationAxis } from "./DurationAxis";
import { DurationTick } from "./DurationTick";
import { TaskNames } from "./TaskNames";
-import { flattenNodes, type RunWithDuration } from "./utils";
+import { flattenNodes } from "./utils";
dayjs.extend(dayjsDuration);
@@ -39,36 +42,46 @@ type Props = {
};
export const Grid = ({ limit }: Props) => {
+ const [selectedIsVisible, setSelectedIsVisible] = useState<boolean |
undefined>();
+ const [hasActiveRun, setHasActiveRun] = useState<boolean | undefined>();
const { openGroupIds } = useOpenGroups();
- const { dagId = "" } = useParams();
+ const { dagId = "", runId = "" } = useParams();
- const { data: gridData, isLoading, runAfter } = useGrid(limit);
+ const { data: gridRuns, isLoading } = useGridRuns({ limit });
- const runs: Array<RunWithDuration> = useMemo(
- () =>
- (gridData?.dag_runs ?? []).map((run) => {
- const duration = dayjs
- .duration(dayjs(run.end_date ?? undefined).diff(run.start_date ??
undefined))
- .asSeconds();
+ // Check if the selected dag run is inside of the grid response, if not,
we'll update the grid filters
+ // Eventually we should redo the api endpoint to make this work better
+ useEffect(() => {
+ if (gridRuns && runId) {
+ const run = gridRuns.find((dr: GridRunsResponse) => dr.run_id === runId);
- return {
- ...run,
- duration,
- };
- }),
- [gridData?.dag_runs],
- );
+ if (!run) {
+ setSelectedIsVisible(false);
+ }
+ }
+ }, [runId, gridRuns, selectedIsVisible, setSelectedIsVisible]);
+
+ useEffect(() => {
+ if (gridRuns) {
+ const run = gridRuns.some((dr: GridRunsResponse) =>
isStatePending(dr.state));
+
+ if (!run) {
+ setHasActiveRun(false);
+ }
+ }
+ }, [gridRuns, setHasActiveRun]);
+ const { data: dagStructure } = useGridStructure({ hasActiveRun, limit });
// calculate dag run bar heights relative to max
const max = Math.max.apply(
undefined,
- runs.map((dr) => dr.duration),
- );
-
- const { flatNodes } = useMemo(
- () => flattenNodes(gridData === undefined ? [] : gridData.structure.nodes,
openGroupIds),
- [gridData, openGroupIds],
+ gridRuns === undefined
+ ? []
+ : gridRuns
+ .map((dr: GridRunsResponse) => dr.duration)
+ .filter((duration: number | null): duration is number => duration
!== null),
);
+ const { flatNodes } = useMemo(() => flattenNodes(dagStructure,
openGroupIds), [dagStructure, openGroupIds]);
return (
<Flex justifyContent="flex-start" position="relative" pt={50} width="100%">
@@ -81,7 +94,7 @@ export const Grid = ({ limit }: Props) => {
<DurationAxis top="50px" />
<DurationAxis top="4px" />
<Flex flexDirection="column-reverse" height="100px"
position="relative" width="100%">
- {Boolean(runs.length) && (
+ {Boolean(gridRuns?.length) && (
<>
<DurationTick bottom="92px">{Math.floor(max)}s</DurationTick>
<DurationTick bottom="46px">{Math.floor(max /
2)}s</DurationTick>
@@ -90,11 +103,11 @@ export const Grid = ({ limit }: Props) => {
)}
</Flex>
<Flex flexDirection="row-reverse">
- {runs.map((dr) => (
- <Bar key={dr.dag_run_id} max={max} nodes={flatNodes} run={dr} />
+ {gridRuns?.map((dr: GridRunsResponse) => (
+ <Bar key={dr.run_id} max={max} nodes={flatNodes} run={dr} />
))}
</Flex>
- {runAfter === undefined ? undefined : (
+ {selectedIsVisible === undefined || !selectedIsVisible ? undefined :
(
<Link to={`/dags/${dagId}`}>
<IconButton
aria-label="Reset to latest"
diff --git
a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx
index 72691188269..ad864fe1f2a 100644
---
a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx
+++
b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx
@@ -19,7 +19,7 @@
import { Box } from "@chakra-ui/react";
import { useParams, useSearchParams } from "react-router-dom";
-import type { GridTaskInstanceSummary } from "openapi/requests/types.gen";
+import type { LightGridTaskInstanceSummary } from "openapi/requests/types.gen";
import { GridTI } from "./GridTI";
import type { GridTask } from "./utils";
@@ -28,7 +28,7 @@ type Props = {
readonly depth?: number;
readonly nodes: Array<GridTask>;
readonly runId: string;
- readonly taskInstances: Array<GridTaskInstanceSummary>;
+ readonly taskInstances: Array<LightGridTaskInstanceSummary>;
};
export const TaskInstancesColumn = ({ nodes, runId, taskInstances }: Props) =>
{
@@ -37,6 +37,7 @@ export const TaskInstancesColumn = ({ nodes, runId,
taskInstances }: Props) => {
const search = searchParams.toString();
return nodes.map((node) => {
+ // todo: how does this work with mapped? same task id for multiple tis
const taskInstance = taskInstances.find((ti) => ti.task_id === node.id);
if (!taskInstance) {
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/utils.ts
b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/utils.ts
index 4b17098faff..d278597267a 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/utils.ts
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/utils.ts
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-import type { GridDAGRunwithTIs, NodeResponse } from
"openapi/requests/types.gen";
+import type { GridDAGRunwithTIs, GridNodeResponse } from
"openapi/requests/types.gen";
export type RunWithDuration = {
duration: number;
@@ -26,31 +26,33 @@ export type GridTask = {
depth: number;
isGroup?: boolean;
isOpen?: boolean;
-} & NodeResponse;
+} & GridNodeResponse;
-export const flattenNodes = (nodes: Array<NodeResponse>, openGroupIds:
Array<string>, depth: number = 0) => {
+export const flattenNodes = (
+ nodes: Array<GridNodeResponse> | undefined,
+ openGroupIds: Array<string>,
+ depth: number = 0,
+) => {
let flatNodes: Array<GridTask> = [];
let allGroupIds: Array<string> = [];
- nodes.forEach((node) => {
- if (node.type === "task") {
- if (node.children) {
- const { children, ...rest } = node;
-
- flatNodes.push({ ...rest, depth, isGroup: true, isOpen:
openGroupIds.includes(node.id) });
- allGroupIds.push(node.id);
-
- const { allGroupIds: childGroupIds, flatNodes: childNodes } =
flattenNodes(
- children,
- openGroupIds,
- depth + 1,
- );
-
- flatNodes = [...flatNodes, ...(openGroupIds.includes(node.id) ?
childNodes : [])];
- allGroupIds = [...allGroupIds, ...childGroupIds];
- } else {
- flatNodes.push({ ...node, depth });
- }
+ nodes?.forEach((node) => {
+ if (node.children) {
+ const { children, ...rest } = node;
+
+ flatNodes.push({ ...rest, depth, isGroup: true, isOpen:
openGroupIds.includes(node.id) });
+ allGroupIds.push(node.id);
+
+ const { allGroupIds: childGroupIds, flatNodes: childNodes } =
flattenNodes(
+ children,
+ openGroupIds,
+ depth + 1,
+ );
+
+ flatNodes = [...flatNodes, ...(openGroupIds.includes(node.id) ?
childNodes : [])];
+ allGroupIds = [...allGroupIds, ...childGroupIds];
+ } else {
+ flatNodes.push({ ...node, depth });
}
});
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/ToggleGroups.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/ToggleGroups.tsx
index 8b2ebcefe2d..cb740cc9e98 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/ToggleGroups.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/ToggleGroups.tsx
@@ -20,22 +20,21 @@ import { type ButtonGroupProps, IconButton, ButtonGroup }
from "@chakra-ui/react
import { useMemo } from "react";
import { MdExpand, MdCompress } from "react-icons/md";
import { useParams } from "react-router-dom";
+import { useLocalStorage } from "usehooks-ts";
-import { useStructureServiceStructureData } from "openapi/queries";
import { useOpenGroups } from "src/context/openGroups";
+import { useGridStructure } from "src/queries/useGridStructure.ts";
import { flattenNodes } from "./Grid/utils";
export const ToggleGroups = (props: ButtonGroupProps) => {
- const { dagId = "" } = useParams();
- const { data: structure } = useStructureServiceStructureData({
- dagId,
- });
const { openGroupIds, setOpenGroupIds } = useOpenGroups();
-
+ const { dagId = "" } = useParams();
+ const [limit] = useLocalStorage<number>(`dag_runs_limit-${dagId}`, 10);
+ const { data: dagStructure } = useGridStructure({ limit });
const { allGroupIds } = useMemo(
- () => flattenNodes(structure?.nodes ?? [], openGroupIds),
- [structure?.nodes, openGroupIds],
+ () => flattenNodes(dagStructure, openGroupIds),
+ [dagStructure, openGroupIds],
);
// Don't show button if the DAG has no task groups
diff --git a/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx
b/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx
index b28ca34e109..406e023e1ed 100644
--- a/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx
@@ -20,6 +20,7 @@ import { Box, HStack, Skeleton } from "@chakra-ui/react";
import dayjs from "dayjs";
import { lazy, useState, Suspense } from "react";
import { useParams } from "react-router-dom";
+import { useLocalStorage } from "usehooks-ts";
import {
useAssetServiceGetAssetEvents,
@@ -30,7 +31,7 @@ import { AssetEvents } from
"src/components/Assets/AssetEvents";
import { DurationChart } from "src/components/DurationChart";
import TimeRangeSelector from "src/components/TimeRangeSelector";
import { TrendCountButton } from "src/components/TrendCountButton";
-import { isStatePending, useAutoRefresh } from "src/utils";
+import { useGridRuns } from "src/queries/useGridRuns.ts";
const FailedLogs = lazy(() => import("./FailedLogs"));
@@ -44,8 +45,6 @@ export const Overview = () => {
const [endDate, setEndDate] = useState(now.toISOString());
const [assetSortBy, setAssetSortBy] = useState("-timestamp");
- const refetchInterval = useAutoRefresh({});
-
const { data: failedTasks, isLoading } =
useTaskInstanceServiceGetTaskInstances({
dagId: dagId ?? "",
dagRunId: "~",
@@ -55,28 +54,17 @@ export const Overview = () => {
state: ["failed"],
});
+ const [limit] = useLocalStorage<number>(`dag_runs_limit-${dagId}`, 10);
const { data: failedRuns, isLoading: isLoadingFailedRuns } =
useDagRunServiceGetDagRuns({
dagId: dagId ?? "",
+ limit,
runAfterGte: startDate,
runAfterLte: endDate,
state: ["failed"],
});
-
- const { data: runs, isLoading: isLoadingRuns } = useDagRunServiceGetDagRuns(
- {
- dagId: dagId ?? "",
- limit: 14,
- orderBy: "-run_after",
- },
- undefined,
- {
- refetchInterval: (query) =>
- query.state.data?.dag_runs.some((run) => isStatePending(run.state)) ?
refetchInterval : false,
- },
- );
-
+ const { data: gridRuns, isLoading: isLoadingRuns } = useGridRuns({ limit });
const { data: assetEventsData, isLoading: isLoadingAssetEvents } =
useAssetServiceGetAssetEvents({
- limit: 6,
+ limit,
orderBy: assetSortBy,
sourceDagId: dagId,
timestampGte: startDate,
@@ -131,7 +119,7 @@ export const Overview = () => {
{isLoadingRuns ? (
<Skeleton height="200px" w="full" />
) : (
- <DurationChart entries={runs?.dag_runs.slice().reverse()}
kind="Dag Run" />
+ <DurationChart entries={gridRuns?.slice().reverse()} kind="Dag
Run" />
)}
</Box>
{assetEventsData && assetEventsData.total_entries > 0 ? (
diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
index 831727105ba..b3faf701959 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
@@ -22,6 +22,7 @@ import { Flex, HStack, Link, type SelectValueChangeDetails,
Text } from "@chakra
import type { ColumnDef } from "@tanstack/react-table";
import { useCallback } from "react";
import { Link as RouterLink, useParams, useSearchParams } from
"react-router-dom";
+import { useLocalStorage } from "usehooks-ts";
import { useDagRunServiceGetDagRuns } from "openapi/queries";
import type { DAGRunResponse, DagRunState, DagRunType } from
"openapi/requests/types.gen";
@@ -147,12 +148,13 @@ export const DagRuns = () => {
const endDate = searchParams.get(END_DATE_PARAM);
const refetchInterval = useAutoRefresh({});
+ const [limit] = useLocalStorage<number>(`dag_runs_limit-${dagId}`, 10);
const { data, error, isLoading } = useDagRunServiceGetDagRuns(
{
dagId: dagId ?? "~",
endDateLte: endDate ?? undefined,
- limit: pagination.pageSize,
+ limit,
offset: pagination.pageIndex * pagination.pageSize,
orderBy,
runType: filteredType === null ? undefined : [filteredType],
diff --git a/airflow-core/src/airflow/ui/src/queries/useGrid.ts
b/airflow-core/src/airflow/ui/src/queries/useGrid.ts
deleted file mode 100644
index 9ebd1a02d6e..00000000000
--- a/airflow-core/src/airflow/ui/src/queries/useGrid.ts
+++ /dev/null
@@ -1,72 +0,0 @@
-/*!
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import { keepPreviousData } from "@tanstack/react-query";
-import { useEffect, useState } from "react";
-import { useParams } from "react-router-dom";
-
-import { useDagRunServiceGetDagRun, useGridServiceGridData } from
"openapi/queries";
-import type { GridResponse } from "openapi/requests/types.gen";
-import { isStatePending, useAutoRefresh } from "src/utils";
-
-export const useGrid = (limit: number) => {
- const { dagId = "", runId = "" } = useParams();
- const [runAfter, setRunAfter] = useState<string | undefined>();
-
- const { data: dagRun } = useDagRunServiceGetDagRun(
- {
- dagId,
- dagRunId: runId,
- },
- undefined,
- { enabled: runId !== "" },
- );
-
- const refetchInterval = useAutoRefresh({ dagId });
-
- // This is necessary for keepPreviousData
- // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-arguments
- const { data: gridData, ...rest } = useGridServiceGridData<GridResponse>(
- {
- dagId,
- limit,
- orderBy: "-run_after",
- runAfterLte: runAfter,
- },
- undefined,
- {
- placeholderData: keepPreviousData,
- refetchInterval: (query) =>
- query.state.data?.dag_runs.some((dr) => isStatePending(dr.state)) &&
refetchInterval,
- },
- );
-
- // Check if the selected dag run is inside of the grid response, if not,
we'll update the grid filters
- // Eventually we should redo the api endpoint to make this work better
- useEffect(() => {
- if (gridData?.dag_runs && dagRun) {
- const hasRun = gridData.dag_runs.find((dr) => dr.dag_run_id ===
dagRun.dag_run_id);
-
- if (!hasRun) {
- setRunAfter(dagRun.run_after);
- }
- }
- }, [dagRun, gridData?.dag_runs, runAfter]);
-
- return { data: gridData, runAfter, ...rest };
-};
diff --git a/airflow-core/src/airflow/ui/src/queries/useGridRuns.ts
b/airflow-core/src/airflow/ui/src/queries/useGridRuns.ts
new file mode 100644
index 00000000000..096630852d4
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useGridRuns.ts
@@ -0,0 +1,44 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useParams } from "react-router-dom";
+
+import { useGridServiceGetGridRuns } from "openapi/queries";
+import { isStatePending, useAutoRefresh } from "src/utils";
+
+export const useGridRuns = ({ limit }: { limit: number }) => {
+ const { dagId = "" } = useParams();
+
+ const defaultRefetchInterval = useAutoRefresh({ dagId });
+
+ const { data: GridRuns, ...rest } = useGridServiceGetGridRuns(
+ {
+ dagId,
+ limit,
+ orderBy: "-run_after",
+ },
+ undefined,
+ {
+ placeholderData: (prev) => prev,
+ refetchInterval: (query) =>
+ query.state.data?.some((run) => isStatePending(run.state)) &&
defaultRefetchInterval,
+ },
+ );
+
+ return { data: GridRuns, ...rest };
+};
diff --git a/airflow-core/src/airflow/ui/src/queries/useGridStructure.ts
b/airflow-core/src/airflow/ui/src/queries/useGridStructure.ts
new file mode 100644
index 00000000000..eaeaede6050
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useGridStructure.ts
@@ -0,0 +1,49 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useParams } from "react-router-dom";
+
+import { useGridServiceGetDagStructure } from "openapi/queries";
+import { useAutoRefresh } from "src/utils";
+
+export const useGridStructure = ({
+ hasActiveRun = undefined,
+ limit,
+}: {
+ hasActiveRun?: boolean;
+ limit?: number;
+}) => {
+ const { dagId = "" } = useParams();
+ const refetchInterval = useAutoRefresh({ dagId });
+
+ // This is necessary for keepPreviousData
+ const { data: dagStructure, ...rest } = useGridServiceGetDagStructure(
+ {
+ dagId,
+ limit,
+ orderBy: "-run_after",
+ },
+ undefined,
+ {
+ placeholderData: (prev) => prev,
+ refetchInterval: hasActiveRun ? refetchInterval : false,
+ },
+ );
+
+ return { data: dagStructure, ...rest };
+};
diff --git a/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
new file mode 100644
index 00000000000..5bd3d1b85f0
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
@@ -0,0 +1,46 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useParams } from "react-router-dom";
+
+import { useGridServiceGetGridTiSummaries } from "openapi/queries";
+import type { GridRunsResponse } from "openapi/requests";
+import { isStatePending, useAutoRefresh } from "src/utils";
+
+export const useGridTiSummaries = (run: GridRunsResponse) => {
+ const { dagId = "" } = useParams();
+
+ const refetchInterval = useAutoRefresh({ dagId });
+
+ const { data: gridTiSummaries, ...rest } = useGridServiceGetGridTiSummaries(
+ {
+ dagId,
+ runId: run.run_id,
+ },
+ undefined,
+ {
+ placeholderData: (prev) => prev,
+ refetchInterval: (query) =>
+ (isStatePending(run.state) ||
+ query.state.data?.task_instances.some((ti) =>
isStatePending(ti.state))) &&
+ refetchInterval,
+ },
+ );
+
+ return { data: gridTiSummaries, ...rest };
+};
diff --git a/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts
b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts
index ec7736824c1..3aeeb3f4f3e 100644
--- a/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts
@@ -20,13 +20,15 @@ import { useQueryClient } from "@tanstack/react-query";
import { useEffect, useRef } from "react";
import {
- useDagRunServiceGetDagRuns,
useDagServiceGetDagDetailsKey,
UseDagRunServiceGetDagRunsKeyFn,
UseDagServiceGetDagDetailsKeyFn,
useDagServiceRecentDagRunsKey,
UseGridServiceGridDataKeyFn,
UseTaskInstanceServiceGetTaskInstancesKeyFn,
+ useGridServiceGetLatestRun,
+ UseGridServiceGetDagStructureKeyFn,
+ UseGridServiceGetGridRunsKeyFn,
} from "openapi/queries";
import { useConfig } from "./useConfig";
@@ -36,15 +38,15 @@ export const useRefreshOnNewDagRuns = (dagId: string,
hasPendingRuns: boolean |
const previousDagRunIdRef = useRef<string>();
const autoRefreshInterval = useConfig("auto_refresh_interval") as number;
- const { data } = useDagRunServiceGetDagRuns({ dagId, limit: 1, orderBy:
"-run_after" }, undefined, {
+ const { data } = useGridServiceGetLatestRun({ dagId }, undefined, {
enabled: Boolean(dagId) && !hasPendingRuns,
refetchInterval: Boolean(autoRefreshInterval) ? autoRefreshInterval * 1000
: 5000,
});
useEffect(() => {
- const latestDagRun = data?.dag_runs[0];
+ const latestDagRun = data;
- const latestDagRunId = latestDagRun?.dag_run_id;
+ const latestDagRunId = latestDagRun?.run_id;
if ((latestDagRunId ?? "") && previousDagRunIdRef.current !==
latestDagRunId) {
previousDagRunIdRef.current = latestDagRunId;
@@ -56,6 +58,8 @@ export const useRefreshOnNewDagRuns = (dagId: string,
hasPendingRuns: boolean |
UseDagRunServiceGetDagRunsKeyFn({ dagId }, [{ dagId }]),
UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId: "~" },
[{ dagId, dagRunId: "~" }]),
UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]),
+ UseGridServiceGetDagStructureKeyFn({ dagId }, [{ dagId }]),
+ UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
];
queryKeys.forEach((key) => {
diff --git a/airflow-core/src/airflow/utils/task_group.py
b/airflow-core/src/airflow/utils/task_group.py
index 034eb6d1bb8..b66a2cfda8d 100644
--- a/airflow-core/src/airflow/utils/task_group.py
+++ b/airflow-core/src/airflow/utils/task_group.py
@@ -94,3 +94,41 @@ def task_group_to_dict(task_item_or_group,
parent_group_is_mapped=False):
"children": children,
"type": "task",
}
+
+
+def task_group_to_dict_grid(task_item_or_group, parent_group_is_mapped=False):
+ """Create a nested dict representation of this TaskGroup and its children
used to construct the Graph."""
+ from airflow.sdk.definitions._internal.abstractoperator import
AbstractOperator
+ from airflow.sdk.definitions.mappedoperator import MappedOperator
+
+ if isinstance(task := task_item_or_group, AbstractOperator):
+ is_mapped = None
+ if isinstance(task, MappedOperator) or parent_group_is_mapped:
+ is_mapped = True
+ setup_teardown_type = None
+ if task.is_setup is True:
+ setup_teardown_type = "setup"
+ elif task.is_teardown is True:
+ setup_teardown_type = "teardown"
+ return {
+ "id": task.task_id,
+ "label": task.label,
+ "is_mapped": is_mapped,
+ "children": None,
+ "setup_teardown_type": setup_teardown_type,
+ }
+
+ task_group = task_item_or_group
+ task_group_sort = get_task_group_children_getter()
+ is_mapped_group = isinstance(task_group, MappedTaskGroup)
+ children = [
+ task_group_to_dict_grid(x,
parent_group_is_mapped=parent_group_is_mapped or is_mapped_group)
+ for x in task_group_sort(task_group)
+ ]
+
+ return {
+ "id": task_group.group_id,
+ "label": task_group.label,
+ "is_mapped": is_mapped_group or None,
+ "children": children or None,
+ }
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
index 181d64b3a17..908ebb24beb 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
@@ -42,6 +42,7 @@ pytestmark = pytest.mark.db_test
DAG_ID = "test_dag"
DAG_ID_2 = "test_dag_2"
DAG_ID_3 = "test_dag_3"
+DAG_ID_4 = "test_dag_4"
TASK_ID = "task"
TASK_ID_2 = "task2"
TASK_ID_3 = "task3"
@@ -59,11 +60,9 @@ GRID_RUN_1 = {
"data_interval_start": "2024-11-29T00:00:00Z",
"end_date": "2024-12-31T00:00:00Z",
"logical_date": "2024-11-30T00:00:00Z",
- "note": None,
- "queued_at": None,
+ "run_after": "2024-11-30T00:00:00Z",
"run_type": "scheduled",
"start_date": "2016-01-01T00:00:00Z",
- "run_after": "2024-11-30T00:00:00Z",
"state": "success",
"task_instances": [
{
@@ -82,10 +81,6 @@ GRID_RUN_1 = {
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 3,
"task_id": "mapped_task_group",
@@ -107,10 +102,6 @@ GRID_RUN_1 = {
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 2,
"task_id": "task_group.inner_task_group",
@@ -132,10 +123,6 @@ GRID_RUN_1 = {
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 5,
"task_id": "task_group",
@@ -157,10 +144,6 @@ GRID_RUN_1 = {
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 1,
"task_id": "mapped_task_2",
@@ -182,10 +165,6 @@ GRID_RUN_1 = {
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 3,
"task_id": "mapped_task_group.subtask",
@@ -207,10 +186,6 @@ GRID_RUN_1 = {
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 1,
"task_id": "task",
@@ -232,10 +207,6 @@ GRID_RUN_1 = {
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 2,
"task_id": "task_group.inner_task_group.inner_task_group_sub_task",
@@ -257,10 +228,6 @@ GRID_RUN_1 = {
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 4,
"task_id": "task_group.mapped_task",
@@ -275,8 +242,6 @@ GRID_RUN_2 = {
"data_interval_start": "2024-11-29T00:00:00Z",
"end_date": "2024-12-31T00:00:00Z",
"logical_date": "2024-12-01T00:00:00Z",
- "note": None,
- "queued_at": None,
"run_after": "2024-11-30T00:00:00Z",
"run_type": "manual",
"start_date": "2016-01-01T00:00:00Z",
@@ -299,8 +264,6 @@ GRID_RUN_2 = {
"upstream_failed": 0,
},
"end_date": "2024-12-30T01:02:03Z",
- "note": None,
- "queued_dttm": None,
"start_date": "2024-12-30T01:00:00Z",
"state": "running",
"task_count": 3,
@@ -323,11 +286,6 @@ GRID_RUN_2 = {
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
- "state": None,
"task_count": 2,
"task_id": "task_group.inner_task_group",
"try_number": 0,
@@ -348,11 +306,6 @@ GRID_RUN_2 = {
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
- "state": None,
"task_count": 5,
"task_id": "task_group",
"try_number": 0,
@@ -373,11 +326,6 @@ GRID_RUN_2 = {
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
- "state": None,
"task_count": 1,
"task_id": "mapped_task_2",
"try_number": 0,
@@ -399,8 +347,6 @@ GRID_RUN_2 = {
"upstream_failed": 0,
},
"end_date": "2024-12-30T01:02:03Z",
- "note": None,
- "queued_dttm": None,
"start_date": "2024-12-30T01:00:00Z",
"state": "running",
"task_count": 3,
@@ -423,10 +369,6 @@ GRID_RUN_2 = {
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 1,
"task_id": "task",
@@ -448,11 +390,6 @@ GRID_RUN_2 = {
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
- "state": None,
"task_count": 2,
"task_id": "task_group.inner_task_group.inner_task_group_sub_task",
"try_number": 0,
@@ -473,11 +410,6 @@ GRID_RUN_2 = {
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
- "state": None,
"task_count": 4,
"task_id": "task_group.mapped_task",
"try_number": 0,
@@ -485,103 +417,6 @@ GRID_RUN_2 = {
],
}
-STRUCTURE = {
- "edges": [],
- "nodes": [
- {
- "asset_condition_type": None,
- "children": [
- {
- "asset_condition_type": None,
- "children": None,
- "id": "mapped_task_group.subtask",
- "is_mapped": True,
- "label": "subtask",
- "operator": "MockOperator",
- "setup_teardown_type": None,
- "tooltip": None,
- "type": "task",
- },
- ],
- "id": "mapped_task_group",
- "is_mapped": True,
- "label": "mapped_task_group",
- "operator": None,
- "setup_teardown_type": None,
- "tooltip": "",
- "type": "task",
- },
- {
- "asset_condition_type": None,
- "children": None,
- "id": "task",
- "is_mapped": None,
- "label": "task",
- "operator": "EmptyOperator",
- "setup_teardown_type": None,
- "tooltip": None,
- "type": "task",
- },
- {
- "asset_condition_type": None,
- "children": [
- {
- "asset_condition_type": None,
- "children": [
- {
- "asset_condition_type": None,
- "children": None,
- "id":
"task_group.inner_task_group.inner_task_group_sub_task",
- "is_mapped": True,
- "label": "inner_task_group_sub_task",
- "operator": "MockOperator",
- "setup_teardown_type": None,
- "tooltip": None,
- "type": "task",
- },
- ],
- "id": "task_group.inner_task_group",
- "is_mapped": False,
- "label": "inner_task_group",
- "operator": None,
- "setup_teardown_type": None,
- "tooltip": "",
- "type": "task",
- },
- {
- "asset_condition_type": None,
- "children": None,
- "id": "task_group.mapped_task",
- "is_mapped": True,
- "label": "mapped_task",
- "operator": "MockOperator",
- "setup_teardown_type": None,
- "tooltip": None,
- "type": "task",
- },
- ],
- "id": "task_group",
- "is_mapped": False,
- "label": "task_group",
- "operator": None,
- "setup_teardown_type": None,
- "tooltip": "",
- "type": "task",
- },
- {
- "asset_condition_type": None,
- "children": None,
- "id": "mapped_task_2",
- "is_mapped": True,
- "label": "mapped_task_2",
- "operator": "MockOperator",
- "setup_teardown_type": None,
- "tooltip": None,
- "type": "task",
- },
- ],
-}
-
@pytest.fixture(autouse=True, scope="module")
def examples_dag_bag():
@@ -692,6 +527,35 @@ def setup(dag_maker, session=None):
ti.state = TaskInstanceState.SUCCESS
ti.end_date = None
+ # DAG 4 for testing removed task
+ with dag_maker(dag_id=DAG_ID_4, serialized=True, session=session) as dag_4:
+ t1 = EmptyOperator(task_id="t1")
+ t2 = EmptyOperator(task_id="t2")
+ with TaskGroup(group_id=f"{TASK_GROUP_ID}-1") as tg1:
+ with TaskGroup(group_id=f"{TASK_GROUP_ID}-2") as tg2:
+ EmptyOperator(task_id="t3")
+ EmptyOperator(task_id="t4")
+ EmptyOperator(task_id="t5")
+ t6 = EmptyOperator(task_id="t6")
+ tg2 >> t6
+ t7 = EmptyOperator(task_id="t7")
+ t1 >> t2 >> tg1 >> t7
+
+ logical_date = timezone.datetime(2024, 11, 30)
+ data_interval =
dag_4.timetable.infer_manual_data_interval(run_after=logical_date)
+ run_4 = dag_maker.create_dagrun(
+ run_id="run_4-1",
+ state=DagRunState.SUCCESS,
+ run_type=DagRunType.SCHEDULED,
+ start_date=logical_date,
+ logical_date=logical_date,
+ data_interval=data_interval,
+ **triggered_by_kwargs,
+ )
+ for ti in run_4.task_instances:
+ ti.state = "success"
+ session.commit()
+
@pytest.fixture(autouse=True)
def _clean():
@@ -714,7 +578,6 @@ class TestGetGridDataEndpoint:
response = test_client.get(f"/grid/{DAG_ID}")
assert response.status_code == 200
assert response.json() == {
- "structure": STRUCTURE,
"dag_runs": [GRID_RUN_1, GRID_RUN_2],
}
@@ -724,7 +587,6 @@ class TestGetGridDataEndpoint:
(
"logical_date",
{
- "structure": STRUCTURE,
"dag_runs": [
GRID_RUN_1,
GRID_RUN_2,
@@ -734,7 +596,6 @@ class TestGetGridDataEndpoint:
(
"-logical_date",
{
- "structure": STRUCTURE,
"dag_runs": [
GRID_RUN_2,
GRID_RUN_1,
@@ -744,7 +605,6 @@ class TestGetGridDataEndpoint:
(
"run_after",
{
- "structure": STRUCTURE,
"dag_runs": [
GRID_RUN_1,
GRID_RUN_2,
@@ -754,7 +614,6 @@ class TestGetGridDataEndpoint:
(
"-run_after",
{
- "structure": STRUCTURE,
"dag_runs": [
GRID_RUN_2,
GRID_RUN_1,
@@ -775,7 +634,6 @@ class TestGetGridDataEndpoint:
"true",
"false",
{
- "structure": STRUCTURE,
"dag_runs": [
{
**GRID_RUN_1,
@@ -796,10 +654,6 @@ class TestGetGridDataEndpoint:
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 3,
"task_id": "mapped_task_group",
@@ -821,10 +675,6 @@ class TestGetGridDataEndpoint:
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 3,
"task_id": "mapped_task_group.subtask",
@@ -852,8 +702,6 @@ class TestGetGridDataEndpoint:
"upstream_failed": 0,
},
"end_date": "2024-12-30T01:02:03Z",
- "note": None,
- "queued_dttm": None,
"start_date": "2024-12-30T01:00:00Z",
"state": "running",
"task_count": 3,
@@ -877,8 +725,6 @@ class TestGetGridDataEndpoint:
"upstream_failed": 0,
},
"end_date": "2024-12-30T01:02:03Z",
- "note": None,
- "queued_dttm": None,
"start_date": "2024-12-30T01:00:00Z",
"state": "running",
"task_count": 3,
@@ -894,7 +740,6 @@ class TestGetGridDataEndpoint:
"false",
"true",
{
- "structure": STRUCTURE,
"dag_runs": [
{
**GRID_RUN_1,
@@ -915,10 +760,6 @@ class TestGetGridDataEndpoint:
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 3,
"task_id": "mapped_task_group",
@@ -940,10 +781,6 @@ class TestGetGridDataEndpoint:
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 3,
"task_id": "mapped_task_group.subtask",
@@ -971,8 +808,6 @@ class TestGetGridDataEndpoint:
"upstream_failed": 0,
},
"end_date": "2024-12-30T01:02:03Z",
- "note": None,
- "queued_dttm": None,
"start_date": "2024-12-30T01:00:00Z",
"state": "running",
"task_count": 3,
@@ -996,8 +831,6 @@ class TestGetGridDataEndpoint:
"upstream_failed": 0,
},
"end_date": "2024-12-30T01:02:03Z",
- "note": None,
- "queued_dttm": None,
"start_date": "2024-12-30T01:00:00Z",
"state": "running",
"task_count": 3,
@@ -1031,14 +864,12 @@ class TestGetGridDataEndpoint:
(
1,
{
- "structure": STRUCTURE,
"dag_runs": [GRID_RUN_1],
},
),
(
2,
{
- "structure": STRUCTURE,
"dag_runs": [GRID_RUN_1, GRID_RUN_2],
},
),
@@ -1058,7 +889,6 @@ class TestGetGridDataEndpoint:
"logical_date_lte": timezone.datetime(2024, 11, 30),
},
{
- "structure": STRUCTURE,
"dag_runs": [GRID_RUN_1],
},
),
@@ -1067,7 +897,7 @@ class TestGetGridDataEndpoint:
"logical_date_gte": timezone.datetime(2024, 10, 30),
"logical_date_lte": timezone.datetime(2024, 10, 30),
},
- {"dag_runs": [], "structure": STRUCTURE},
+ {"dag_runs": []},
),
(
{
@@ -1075,7 +905,6 @@ class TestGetGridDataEndpoint:
"run_after_lte": timezone.datetime(2024, 11, 30),
},
{
- "structure": STRUCTURE,
"dag_runs": [GRID_RUN_1, GRID_RUN_2],
},
),
@@ -1084,7 +913,7 @@ class TestGetGridDataEndpoint:
"run_after_gte": timezone.datetime(2024, 10, 30),
"run_after_lte": timezone.datetime(2024, 10, 30),
},
- {"dag_runs": [], "structure": STRUCTURE},
+ {"dag_runs": []},
),
],
)
@@ -1102,14 +931,12 @@ class TestGetGridDataEndpoint:
(
["manual"],
{
- "structure": STRUCTURE,
"dag_runs": [GRID_RUN_2],
},
),
(
["scheduled"],
{
- "structure": STRUCTURE,
"dag_runs": [GRID_RUN_1],
},
),
@@ -1140,20 +967,18 @@ class TestGetGridDataEndpoint:
(
["success"],
{
- "structure": STRUCTURE,
"dag_runs": [GRID_RUN_1],
},
),
(
["failed"],
{
- "structure": STRUCTURE,
"dag_runs": [GRID_RUN_2],
},
),
(
["running"],
- {"dag_runs": [], "structure": STRUCTURE},
+ {"dag_runs": []},
),
],
)
@@ -1194,22 +1019,6 @@ class TestGetGridDataEndpoint:
assert response.status_code == 200
assert response.json() == {
"dag_runs": [],
- "structure": {
- "nodes": [
- {
- "asset_condition_type": None,
- "children": None,
- "id": "task2",
- "is_mapped": None,
- "label": "task2",
- "operator": "EmptyOperator",
- "setup_teardown_type": None,
- "tooltip": None,
- "type": "task",
- },
- ],
- "edges": [],
- },
}
def test_should_response_200_with_deleted_task_and_taskgroup(self,
session, test_client):
@@ -1225,68 +1034,15 @@ class TestGetGridDataEndpoint:
response = test_client.get(f"/grid/{DAG_ID_3}")
assert response.status_code == 200
assert response.json() == {
- "structure": {
- "edges": [],
- "nodes": [
- {
- "asset_condition_type": None,
- "children": None,
- "id": "task3",
- "is_mapped": None,
- "label": "task3",
- "operator": "EmptyOperator",
- "setup_teardown_type": None,
- "tooltip": None,
- "type": "task",
- },
- {
- "asset_condition_type": None,
- "children": None,
- "id": "task4",
- "is_mapped": None,
- "label": "task4",
- "operator": "EmptyOperator",
- "setup_teardown_type": None,
- "tooltip": None,
- "type": "task",
- },
- {
- "asset_condition_type": None,
- "children": [
- {
- "asset_condition_type": None,
- "children": None,
- "id": "task_group.inner_task",
- "is_mapped": None,
- "label": "inner_task",
- "operator": "EmptyOperator",
- "setup_teardown_type": None,
- "tooltip": None,
- "type": "task",
- },
- ],
- "id": "task_group",
- "is_mapped": False,
- "label": "task_group",
- "operator": None,
- "setup_teardown_type": None,
- "tooltip": "",
- "type": "task",
- },
- ],
- },
"dag_runs": [
{
"dag_run_id": "run_3",
"data_interval_end": "2024-11-30T00:00:00Z",
"data_interval_start": "2024-11-29T00:00:00Z",
- "end_date": None,
"logical_date": "2024-11-30T00:00:00Z",
- "note": None,
"queued_at": "2024-12-31T00:00:00Z",
"run_after": "2024-11-30T00:00:00Z",
"run_type": "scheduled",
- "start_date": None,
"state": "queued",
"task_instances": [
{
@@ -1305,10 +1061,6 @@ class TestGetGridDataEndpoint:
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 1,
"task_id": "task_group",
@@ -1330,10 +1082,6 @@ class TestGetGridDataEndpoint:
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 1,
"task_id": "task3",
@@ -1355,10 +1103,6 @@ class TestGetGridDataEndpoint:
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 1,
"task_id": "task_group.inner_task",
@@ -1372,8 +1116,6 @@ class TestGetGridDataEndpoint:
"data_interval_start": "2024-11-29T00:00:00Z",
"end_date": "2024-12-31T00:00:00Z",
"logical_date": "2024-12-01T00:00:00Z",
- "note": None,
- "queued_at": None,
"run_after": "2024-11-30T00:00:00Z",
"run_type": "manual",
"start_date": "2024-11-30T00:00:00Z",
@@ -1395,10 +1137,6 @@ class TestGetGridDataEndpoint:
"up_for_retry": 0,
"upstream_failed": 0,
},
- "end_date": None,
- "note": None,
- "queued_dttm": None,
- "start_date": None,
"state": "success",
"task_count": 1,
"task_id": "task3",
@@ -1408,3 +1146,119 @@ class TestGetGridDataEndpoint:
},
],
}
+
+ def test_get_dag_structure(self, session, test_client):
+ session.commit()
+ response = test_client.get(f"/grid/structure/{DAG_ID}?limit=5")
+ assert response.status_code == 200
+ assert response.json() == [
+ {
+ "children": [{"id": "mapped_task_group.subtask", "is_mapped":
True, "label": "subtask"}],
+ "id": "mapped_task_group",
+ "is_mapped": True,
+ "label": "mapped_task_group",
+ },
+ {"id": "task", "label": "task"},
+ {
+ "children": [
+ {
+ "children": [
+ {
+ "id":
"task_group.inner_task_group.inner_task_group_sub_task",
+ "is_mapped": True,
+ "label": "inner_task_group_sub_task",
+ }
+ ],
+ "id": "task_group.inner_task_group",
+ "label": "inner_task_group",
+ },
+ {"id": "task_group.mapped_task", "is_mapped": True,
"label": "mapped_task"},
+ ],
+ "id": "task_group",
+ "label": "task_group",
+ },
+ {"id": "mapped_task_2", "is_mapped": True, "label":
"mapped_task_2"},
+ ]
+
+ def test_get_grid_runs(self, session, test_client):
+ session.commit()
+ response = test_client.get(f"/grid/runs/{DAG_ID}?limit=5")
+ assert response.status_code == 200
+ assert response.json() == [
+ {
+ "dag_id": "test_dag",
+ "duration": 0,
+ "end_date": "2024-12-31T00:00:00Z",
+ "run_after": "2024-11-30T00:00:00Z",
+ "run_id": "run_1",
+ "run_type": "scheduled",
+ "start_date": "2016-01-01T00:00:00Z",
+ "state": "success",
+ },
+ {
+ "dag_id": "test_dag",
+ "duration": 0,
+ "end_date": "2024-12-31T00:00:00Z",
+ "run_after": "2024-11-30T00:00:00Z",
+ "run_id": "run_2",
+ "run_type": "manual",
+ "start_date": "2016-01-01T00:00:00Z",
+ "state": "failed",
+ },
+ ]
+
+ def test_grid_ti_summaries_group(self, session, test_client):
+ run_id = "run_4-1"
+ session.commit()
+ response = test_client.get(f"/grid/ti_summaries/{DAG_ID_4}/{run_id}")
+ assert response.status_code == 200
+ actual = response.json()
+ expected = {
+ "dag_id": "test_dag_4",
+ "run_id": "run_4-1",
+ "task_instances": [
+ {"state": "success", "task_id": "t1"},
+ {"state": "success", "task_id": "t2"},
+ {"state": "success", "task_id": "t7"},
+ {"state": "success", "task_id": "task_group-1"},
+ {"state": "success", "task_id": "task_group-1.t6"},
+ {"state": "success", "task_id": "task_group-1.task_group-2"},
+ {"state": "success", "task_id":
"task_group-1.task_group-2.t3"},
+ {"state": "success", "task_id":
"task_group-1.task_group-2.t4"},
+ {"state": "success", "task_id":
"task_group-1.task_group-2.t5"},
+ ],
+ }
+ for obj in actual, expected:
+ tis = obj["task_instances"]
+ tis[:] = sorted(tis, key=lambda x: x["task_id"])
+ assert actual == expected
+
+ def test_grid_ti_summaries_mapped(self, session, test_client):
+ run_id = "run_2"
+ session.commit()
+ response = test_client.get(f"/grid/ti_summaries/{DAG_ID}/{run_id}")
+ assert response.status_code == 200
+ data = response.json()
+ actual = data["task_instances"]
+
+ def sort_dict(in_dict):
+ in_dict = sorted(in_dict, key=lambda x: x["task_id"])
+ out = []
+ for d in in_dict:
+ n = {k: d[k] for k in sorted(d, reverse=True)}
+ out.append(n)
+ return out
+
+ expected = [
+ {"task_id": "mapped_task_group", "state": "running"},
+ {"task_id": "task_group.inner_task_group"},
+ {"task_id": "task_group"},
+ {"task_id": "mapped_task_2"},
+ {"task_id": "mapped_task_group.subtask", "state": "running"},
+ {"task_id": "task", "state": "success"},
+ {"task_id":
"task_group.inner_task_group.inner_task_group_sub_task"},
+ {"task_id": "task_group.mapped_task"},
+ ]
+ expected = sort_dict(expected)
+ actual = sort_dict(actual)
+ assert actual == expected