pierrejeambrun commented on code in PR #53216:
URL: https://github.com/apache/airflow/pull/53216#discussion_r2248281167
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -291,13 +336,8 @@ def get_grid_ti_summaries(
"""
tis_of_dag_runs, _ = paginated_select(
statement=(
- select(
- TaskInstance.task_id,
- TaskInstance.state,
- TaskInstance.dag_version_id,
- TaskInstance.start_date,
- TaskInstance.end_date,
- )
+ select(TaskInstance)
Review Comment:
Keep the explicit naming of fields that we want please. Grid endpoint are
very subject to performance issue, and this will ensure that we do not load
more than expected. (For instance if a field is added to the TaskInstance model)
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -216,35 +217,79 @@ def get_grid_runs(
run_after: Annotated[RangeFilter,
Depends(datetime_range_filter_factory("run_after", DagRun))],
) -> list[GridRunsResponse]:
"""Get info about a run for the grid."""
- # Retrieve, sort the previous DAG Runs
- base_query = select(
- DagRun.dag_id,
- DagRun.run_id,
- DagRun.queued_at,
- DagRun.start_date,
- DagRun.end_date,
- DagRun.run_after,
- DagRun.state,
- DagRun.run_type,
- ).where(DagRun.dag_id == dag_id)
+ try:
+ # Base query to get DagRun information with version details
+ # Only load what's absolutely necessary - created_dag_version for
fallback
+ base_query = (
+
select(DagRun).options(joinedload(DagRun.created_dag_version)).where(DagRun.dag_id
== dag_id)
+ )
- # This comparison is to fall back to DAG timetable when no order_by is
provided
- if order_by.value == [order_by.get_primary_key_string()]:
- latest_serdag = _get_latest_serdag(dag_id, session)
- latest_dag = latest_serdag.dag
- ordering = list(latest_dag.timetable.run_ordering)
- order_by = SortParam(
- allowed_attrs=ordering,
- model=DagRun,
- ).set_value(ordering)
- dag_runs_select_filter, _ = paginated_select(
- statement=base_query,
- order_by=order_by,
- offset=offset,
- filters=[run_after],
- limit=limit,
- )
- return session.execute(dag_runs_select_filter)
+ # This comparison is to fall back to DAG timetable when no order_by is
provided
+ if order_by.value == [order_by.get_primary_key_string()]:
+ latest_serdag = _get_latest_serdag(dag_id, session)
+ latest_dag = latest_serdag.dag
+ ordering = list(latest_dag.timetable.run_ordering)
+ order_by = SortParam(
+ allowed_attrs=ordering,
+ model=DagRun,
+ ).set_value(ordering)
+
+ dag_runs_select_filter, _ = paginated_select(
+ statement=base_query,
+ order_by=order_by,
+ offset=offset,
+ filters=[run_after],
+ limit=limit,
+ )
+
+ dag_runs = list(session.scalars(dag_runs_select_filter))
+
+ if not dag_runs:
+ return []
+
+ response = []
+ for dag_run in dag_runs:
+ # Simple version number extraction - let client handle comparison
logic
+ dag_version_number = None
+ if dag_run.dag_versions:
+ # Get the latest version number from dag_versions
+ dag_version_number = max(dv.version_number for dv in
dag_run.dag_versions)
+ elif dag_run.created_dag_version:
+ # Fallback to created_dag_version
+ dag_version_number = dag_run.created_dag_version.version_number
+
+ grid_run = GridRunsResponse(
+ dag_id=dag_run.dag_id,
+ run_id=dag_run.run_id,
+ queued_at=dag_run.queued_at,
+ start_date=dag_run.start_date,
+ end_date=dag_run.end_date,
+ run_after=dag_run.run_after,
+ state=dag_run.state,
+ run_type=dag_run.run_type,
+ dag_version_number=dag_version_number,
+ )
+ response.append(grid_run)
+
+ return response
+
+ except HTTPException:
+ # Re-raise HTTPException (like 404 from _get_latest_serdag) without
modification
+ raise
+ except ValueError as e:
+ log.warning("Invalid data format while retrieving grid runs",
dag_id=dag_id, error=str(e))
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ detail={"reason": "invalid_data", "message": f"Invalid data
format: {str(e)}"},
+ )
+ except Exception as e:
+ log.error(
+ "Unexpected error retrieving grid runs", dag_id=dag_id,
error=str(e), error_type=type(e).__name__
+ )
+ raise HTTPException(
+ status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail={"reason": "internal_error", "message": "An unexpected
error occurred"},
Review Comment:
Can you remove that unnecessary try catch block? I don't think it's
necessary, and even if it was that's not related to the feature we're trying to
implement and should be a separate PR. Try to keep the change set limited to
the scope of the feature you are trying to implement.
Also because feature will be in 3.1.0 and bug fixes can go into next patch
release 3.0.x which makes things much easier to cherry pick.
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py:
##########
@@ -508,20 +487,26 @@ def test_grid_ti_summaries_group(self, session,
test_client):
{
"state": "success",
"task_id": "t1",
+ "dag_version_id": ANY,
+ "dag_version_number": ANY,
Review Comment:
Can you add a test where dag_version_number and dag_version_id are different
than None, and actually assert the value there, instead of marking 'any'.
##########
airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py:
##########
@@ -68,11 +68,17 @@ def _get_aggs_for_node(detail):
max_end_date = max(x["end_date"] for x in detail if x["end_date"])
except ValueError:
max_end_date = None
+
+ dag_version_id = detail[0].get("dag_version_id") if detail else None
+ dag_version_number = detail[0].get("dag_version_number") if detail else
None
Review Comment:
How can detail be None?
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -216,35 +217,79 @@ def get_grid_runs(
run_after: Annotated[RangeFilter,
Depends(datetime_range_filter_factory("run_after", DagRun))],
) -> list[GridRunsResponse]:
"""Get info about a run for the grid."""
- # Retrieve, sort the previous DAG Runs
- base_query = select(
- DagRun.dag_id,
- DagRun.run_id,
- DagRun.queued_at,
- DagRun.start_date,
- DagRun.end_date,
- DagRun.run_after,
- DagRun.state,
- DagRun.run_type,
- ).where(DagRun.dag_id == dag_id)
+ try:
+ # Base query to get DagRun information with version details
+ # Only load what's absolutely necessary - created_dag_version for
fallback
+ base_query = (
+
select(DagRun).options(joinedload(DagRun.created_dag_version)).where(DagRun.dag_id
== dag_id)
+ )
- # This comparison is to fall back to DAG timetable when no order_by is
provided
- if order_by.value == [order_by.get_primary_key_string()]:
- latest_serdag = _get_latest_serdag(dag_id, session)
- latest_dag = latest_serdag.dag
- ordering = list(latest_dag.timetable.run_ordering)
- order_by = SortParam(
- allowed_attrs=ordering,
- model=DagRun,
- ).set_value(ordering)
- dag_runs_select_filter, _ = paginated_select(
- statement=base_query,
- order_by=order_by,
- offset=offset,
- filters=[run_after],
- limit=limit,
- )
- return session.execute(dag_runs_select_filter)
+ # This comparison is to fall back to DAG timetable when no order_by is
provided
+ if order_by.value == [order_by.get_primary_key_string()]:
+ latest_serdag = _get_latest_serdag(dag_id, session)
+ latest_dag = latest_serdag.dag
+ ordering = list(latest_dag.timetable.run_ordering)
+ order_by = SortParam(
+ allowed_attrs=ordering,
+ model=DagRun,
+ ).set_value(ordering)
+
+ dag_runs_select_filter, _ = paginated_select(
+ statement=base_query,
+ order_by=order_by,
+ offset=offset,
+ filters=[run_after],
+ limit=limit,
+ )
+
+ dag_runs = list(session.scalars(dag_runs_select_filter))
+
+ if not dag_runs:
+ return []
+
+ response = []
+ for dag_run in dag_runs:
+ # Simple version number extraction - let client handle comparison
logic
+ dag_version_number = None
+ if dag_run.dag_versions:
+ # Get the latest version number from dag_versions
+ dag_version_number = max(dv.version_number for dv in
dag_run.dag_versions)
+ elif dag_run.created_dag_version:
+ # Fallback to created_dag_version
+ dag_version_number = dag_run.created_dag_version.version_number
Review Comment:
Why only take the latest one? If both of the dagrun were running at the same
time. DR1 could have [v1,v2,v3], while DR2 could have [v2,v3,v4,v5]. Just
showing 'v5' is not really accurate, the change in version is both v4, and v5.
And we should see all of them.
##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py:
##########
@@ -79,6 +79,7 @@ class GridRunsResponse(BaseModel):
run_after: datetime
state: TaskInstanceState | None
run_type: DagRunType
+ dag_version_number: int | None = None
Review Comment:
I don't understand that, I would expect this to be a list. As defined in
other DagRun models.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -308,18 +348,23 @@ def get_grid_ti_summaries(
limit=None,
return_total_entries=False,
)
- task_instances = list(session.execute(tis_of_dag_runs))
+ task_instances = list(session.scalars(tis_of_dag_runs))
if not task_instances:
raise HTTPException(
status.HTTP_404_NOT_FOUND, f"No task instances for dag_id={dag_id}
run_id={run_id}"
)
ti_details = collections.defaultdict(list)
for ti in task_instances:
+ dag_version_id = str(ti.dag_version_id) if ti.dag_version_id else None
Review Comment:
Do we really need `dag_version_id` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]