pierrejeambrun commented on code in PR #53216:
URL: https://github.com/apache/airflow/pull/53216#discussion_r2248281167


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -291,13 +336,8 @@ def get_grid_ti_summaries(
     """
     tis_of_dag_runs, _ = paginated_select(
         statement=(
-            select(
-                TaskInstance.task_id,
-                TaskInstance.state,
-                TaskInstance.dag_version_id,
-                TaskInstance.start_date,
-                TaskInstance.end_date,
-            )
+            select(TaskInstance)

Review Comment:
   Keep the explicit naming of fields that we want please. Grid endpoint are 
very subject to performance issue, and this will ensure that we do not load 
more than expected. (For instance if a field is added to the TaskInstance model)



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -216,35 +217,79 @@ def get_grid_runs(
     run_after: Annotated[RangeFilter, 
Depends(datetime_range_filter_factory("run_after", DagRun))],
 ) -> list[GridRunsResponse]:
     """Get info about a run for the grid."""
-    # Retrieve, sort the previous DAG Runs
-    base_query = select(
-        DagRun.dag_id,
-        DagRun.run_id,
-        DagRun.queued_at,
-        DagRun.start_date,
-        DagRun.end_date,
-        DagRun.run_after,
-        DagRun.state,
-        DagRun.run_type,
-    ).where(DagRun.dag_id == dag_id)
+    try:
+        # Base query to get DagRun information with version details
+        # Only load what's absolutely necessary - created_dag_version for 
fallback
+        base_query = (
+            
select(DagRun).options(joinedload(DagRun.created_dag_version)).where(DagRun.dag_id
 == dag_id)
+        )
 
-    # This comparison is to fall back to DAG timetable when no order_by is 
provided
-    if order_by.value == [order_by.get_primary_key_string()]:
-        latest_serdag = _get_latest_serdag(dag_id, session)
-        latest_dag = latest_serdag.dag
-        ordering = list(latest_dag.timetable.run_ordering)
-        order_by = SortParam(
-            allowed_attrs=ordering,
-            model=DagRun,
-        ).set_value(ordering)
-    dag_runs_select_filter, _ = paginated_select(
-        statement=base_query,
-        order_by=order_by,
-        offset=offset,
-        filters=[run_after],
-        limit=limit,
-    )
-    return session.execute(dag_runs_select_filter)
+        # This comparison is to fall back to DAG timetable when no order_by is 
provided
+        if order_by.value == [order_by.get_primary_key_string()]:
+            latest_serdag = _get_latest_serdag(dag_id, session)
+            latest_dag = latest_serdag.dag
+            ordering = list(latest_dag.timetable.run_ordering)
+            order_by = SortParam(
+                allowed_attrs=ordering,
+                model=DagRun,
+            ).set_value(ordering)
+
+        dag_runs_select_filter, _ = paginated_select(
+            statement=base_query,
+            order_by=order_by,
+            offset=offset,
+            filters=[run_after],
+            limit=limit,
+        )
+
+        dag_runs = list(session.scalars(dag_runs_select_filter))
+
+        if not dag_runs:
+            return []
+
+        response = []
+        for dag_run in dag_runs:
+            # Simple version number extraction - let client handle comparison 
logic
+            dag_version_number = None
+            if dag_run.dag_versions:
+                # Get the latest version number from dag_versions
+                dag_version_number = max(dv.version_number for dv in 
dag_run.dag_versions)
+            elif dag_run.created_dag_version:
+                # Fallback to created_dag_version
+                dag_version_number = dag_run.created_dag_version.version_number
+
+            grid_run = GridRunsResponse(
+                dag_id=dag_run.dag_id,
+                run_id=dag_run.run_id,
+                queued_at=dag_run.queued_at,
+                start_date=dag_run.start_date,
+                end_date=dag_run.end_date,
+                run_after=dag_run.run_after,
+                state=dag_run.state,
+                run_type=dag_run.run_type,
+                dag_version_number=dag_version_number,
+            )
+            response.append(grid_run)
+
+        return response
+
+    except HTTPException:
+        # Re-raise HTTPException (like 404 from _get_latest_serdag) without 
modification
+        raise
+    except ValueError as e:
+        log.warning("Invalid data format while retrieving grid runs", 
dag_id=dag_id, error=str(e))
+        raise HTTPException(
+            status.HTTP_400_BAD_REQUEST,
+            detail={"reason": "invalid_data", "message": f"Invalid data 
format: {str(e)}"},
+        )
+    except Exception as e:
+        log.error(
+            "Unexpected error retrieving grid runs", dag_id=dag_id, 
error=str(e), error_type=type(e).__name__
+        )
+        raise HTTPException(
+            status.HTTP_500_INTERNAL_SERVER_ERROR,
+            detail={"reason": "internal_error", "message": "An unexpected 
error occurred"},

Review Comment:
   Can you remove that unnecessary try catch block? I don't think it's 
necessary, and even if it was that's not related to the feature we're trying to 
implement and should be a separate PR. Try to keep the change set limited to 
the scope of the feature you are trying to implement.
   
   Also because feature will be in 3.1.0 and bug fixes can go into next patch 
release 3.0.x which makes things much easier to cherry pick.



##########
airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py:
##########
@@ -508,20 +487,26 @@ def test_grid_ti_summaries_group(self, session, 
test_client):
                 {
                     "state": "success",
                     "task_id": "t1",
+                    "dag_version_id": ANY,
+                    "dag_version_number": ANY,

Review Comment:
   Can you add a test where dag_version_number and dag_version_id are different 
than None, and actually assert the value there, instead of marking 'any'.



##########
airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py:
##########
@@ -68,11 +68,17 @@ def _get_aggs_for_node(detail):
         max_end_date = max(x["end_date"] for x in detail if x["end_date"])
     except ValueError:
         max_end_date = None
+
+    dag_version_id = detail[0].get("dag_version_id") if detail else None
+    dag_version_number = detail[0].get("dag_version_number") if detail else 
None

Review Comment:
   How can detail be None? 



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -216,35 +217,79 @@ def get_grid_runs(
     run_after: Annotated[RangeFilter, 
Depends(datetime_range_filter_factory("run_after", DagRun))],
 ) -> list[GridRunsResponse]:
     """Get info about a run for the grid."""
-    # Retrieve, sort the previous DAG Runs
-    base_query = select(
-        DagRun.dag_id,
-        DagRun.run_id,
-        DagRun.queued_at,
-        DagRun.start_date,
-        DagRun.end_date,
-        DagRun.run_after,
-        DagRun.state,
-        DagRun.run_type,
-    ).where(DagRun.dag_id == dag_id)
+    try:
+        # Base query to get DagRun information with version details
+        # Only load what's absolutely necessary - created_dag_version for 
fallback
+        base_query = (
+            
select(DagRun).options(joinedload(DagRun.created_dag_version)).where(DagRun.dag_id
 == dag_id)
+        )
 
-    # This comparison is to fall back to DAG timetable when no order_by is 
provided
-    if order_by.value == [order_by.get_primary_key_string()]:
-        latest_serdag = _get_latest_serdag(dag_id, session)
-        latest_dag = latest_serdag.dag
-        ordering = list(latest_dag.timetable.run_ordering)
-        order_by = SortParam(
-            allowed_attrs=ordering,
-            model=DagRun,
-        ).set_value(ordering)
-    dag_runs_select_filter, _ = paginated_select(
-        statement=base_query,
-        order_by=order_by,
-        offset=offset,
-        filters=[run_after],
-        limit=limit,
-    )
-    return session.execute(dag_runs_select_filter)
+        # This comparison is to fall back to DAG timetable when no order_by is 
provided
+        if order_by.value == [order_by.get_primary_key_string()]:
+            latest_serdag = _get_latest_serdag(dag_id, session)
+            latest_dag = latest_serdag.dag
+            ordering = list(latest_dag.timetable.run_ordering)
+            order_by = SortParam(
+                allowed_attrs=ordering,
+                model=DagRun,
+            ).set_value(ordering)
+
+        dag_runs_select_filter, _ = paginated_select(
+            statement=base_query,
+            order_by=order_by,
+            offset=offset,
+            filters=[run_after],
+            limit=limit,
+        )
+
+        dag_runs = list(session.scalars(dag_runs_select_filter))
+
+        if not dag_runs:
+            return []
+
+        response = []
+        for dag_run in dag_runs:
+            # Simple version number extraction - let client handle comparison 
logic
+            dag_version_number = None
+            if dag_run.dag_versions:
+                # Get the latest version number from dag_versions
+                dag_version_number = max(dv.version_number for dv in 
dag_run.dag_versions)
+            elif dag_run.created_dag_version:
+                # Fallback to created_dag_version
+                dag_version_number = dag_run.created_dag_version.version_number

Review Comment:
   Why only take the latest one? If both of the dagrun were running at the same 
time. DR1 could have [v1,v2,v3], while DR2 could have [v2,v3,v4,v5]. Just 
showing 'v5' is not really accurate, the change in version is both v4, and v5. 
And we should see all of them.



##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py:
##########
@@ -79,6 +79,7 @@ class GridRunsResponse(BaseModel):
     run_after: datetime
     state: TaskInstanceState | None
     run_type: DagRunType
+    dag_version_number: int | None = None

Review Comment:
   I don't understand that, I would expect this to be a list. As defined in 
other DagRun models.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -308,18 +348,23 @@ def get_grid_ti_summaries(
         limit=None,
         return_total_entries=False,
     )
-    task_instances = list(session.execute(tis_of_dag_runs))
+    task_instances = list(session.scalars(tis_of_dag_runs))
     if not task_instances:
         raise HTTPException(
             status.HTTP_404_NOT_FOUND, f"No task instances for dag_id={dag_id} 
run_id={run_id}"
         )
     ti_details = collections.defaultdict(list)
     for ti in task_instances:
+        dag_version_id = str(ti.dag_version_id) if ti.dag_version_id else None

Review Comment:
   Do we really need `dag_version_id` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to