Copilot commented on code in PR #64928:
URL: https://github.com/apache/airflow/pull/64928#discussion_r3066479060


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -468,27 +468,33 @@ def get_grid_ti_summaries_stream(
     """
 
     def _generate() -> Generator[str, None, None]:
+        if not run_ids:
+            return
         serdag_cache: dict[Any, SerializedDagModel | None] = {}
-        for run_id in run_ids or []:
-            tis = session.execute(
-                select(
-                    TaskInstance.task_id,
-                    TaskInstance.state,
-                    TaskInstance.dag_version_id,
-                    TaskInstance.start_date,
-                    TaskInstance.end_date,
-                    DagVersion.version_number,
-                )
-                .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
-                .where(TaskInstance.dag_id == dag_id)
-                .where(TaskInstance.run_id == run_id)
-                .order_by(TaskInstance.task_id)
-                .execution_options(yield_per=1000)
+        tis_query = (
+            select(
+                TaskInstance.run_id,
+                TaskInstance.task_id,
+                TaskInstance.state,
+                TaskInstance.dag_version_id,
+                TaskInstance.start_date,
+                TaskInstance.end_date,
+                DagVersion.version_number,
             )
+            .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
+            .where(TaskInstance.dag_id == dag_id)
+            .where(TaskInstance.run_id.in_(run_ids))
+            .order_by(TaskInstance.run_id, TaskInstance.task_id)

Review Comment:
   This change alters the stream emission order from the caller-provided 
`run_ids` sequence to database sort order (`ORDER BY run_id`). If API/UI 
consumers rely on column order matching the requested `run_ids`, this will be 
an observable behavior change. Consider preserving input order (e.g., 
materialize summaries keyed by `run_id` and emit in `run_ids` order, or order 
by a CASE expression derived from `run_ids`).



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -393,6 +394,7 @@ def get_mapped_task_instance(
         .where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == 
task_id, TI.map_index == map_index)
         .options(joinedload(TI.rendered_task_instance_fields))
         .options(joinedload(TI.dag_version))
+        .options(joinedload(TI.trigger).joinedload(TI.trigger.triggerer_job))

Review Comment:
   `joinedload(...).joinedload(...)` should be passed a mapped class attribute 
for the related entity (e.g., `Trigger.triggerer_job`), not 
`TI.trigger.triggerer_job`. As written, `TI.trigger` is an instrumented 
relationship attribute and does not expose `triggerer_job` as a valid loader 
path, which can raise at runtime or fail to eager load as intended. Update the 
nested `joinedload` to target the Trigger model’s relationship attribute.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -468,27 +468,33 @@ def get_grid_ti_summaries_stream(
     """
 
     def _generate() -> Generator[str, None, None]:
+        if not run_ids:
+            return
         serdag_cache: dict[Any, SerializedDagModel | None] = {}
-        for run_id in run_ids or []:
-            tis = session.execute(
-                select(
-                    TaskInstance.task_id,
-                    TaskInstance.state,
-                    TaskInstance.dag_version_id,
-                    TaskInstance.start_date,
-                    TaskInstance.end_date,
-                    DagVersion.version_number,
-                )
-                .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
-                .where(TaskInstance.dag_id == dag_id)
-                .where(TaskInstance.run_id == run_id)
-                .order_by(TaskInstance.task_id)
-                .execution_options(yield_per=1000)
+        tis_query = (
+            select(
+                TaskInstance.run_id,
+                TaskInstance.task_id,
+                TaskInstance.state,
+                TaskInstance.dag_version_id,
+                TaskInstance.start_date,
+                TaskInstance.end_date,
+                DagVersion.version_number,
             )
+            .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
+            .where(TaskInstance.dag_id == dag_id)
+            .where(TaskInstance.run_id.in_(run_ids))
+            .order_by(TaskInstance.run_id, TaskInstance.task_id)
+            .execution_options(yield_per=2000)
+        )
+
+        from itertools import groupby
+
+        for run_id, run_tis in groupby(session.execute(tis_query), key=lambda 
x: x.run_id):

Review Comment:
   This change alters the stream emission order from the caller-provided 
`run_ids` sequence to database sort order (`ORDER BY run_id`). If API/UI 
consumers rely on column order matching the requested `run_ids`, this will be 
an observable behavior change. Consider preserving input order (e.g., 
materialize summaries keyed by `run_id` and emit in `run_ids` order, or order 
by a CASE expression derived from `run_ids`).



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -120,6 +120,7 @@ def get_task_instance(
         .where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == 
task_id)
         .options(joinedload(TI.rendered_task_instance_fields))
         .options(joinedload(TI.dag_version))
+        .options(joinedload(TI.trigger).joinedload(TI.trigger.triggerer_job))

Review Comment:
   `joinedload(...).joinedload(...)` should be passed a mapped class attribute 
for the related entity (e.g., `Trigger.triggerer_job`), not 
`TI.trigger.triggerer_job`. As written, `TI.trigger` is an instrumented 
relationship attribute and does not expose `triggerer_job` as a valid loader 
path, which can raise at runtime or fail to eager load as intended. Update the 
nested `joinedload` to target the Trigger model’s relationship attribute.



##########
airflow-core/src/airflow/api_fastapi/common/db/task_instances.py:
##########
@@ -55,5 +55,6 @@ def eager_load_TI_and_TIH_for_validation(
         query = query.options(
             joinedload(orm_model.task_instance_note),
             joinedload(orm_model.rendered_task_instance_fields),
+            
joinedload(orm_model.trigger).joinedload(orm_model.trigger.triggerer_job),

Review Comment:
   Same loader-path issue as in the route handlers: 
`orm_model.trigger.triggerer_job` is not a valid class-bound attribute path for 
SQLAlchemy eager loading. Use the related model’s relationship attribute (e.g., 
`Trigger.triggerer_job`) for the nested `joinedload`, importing the Trigger ORM 
class in this module if needed.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -468,27 +468,33 @@ def get_grid_ti_summaries_stream(
     """
 
     def _generate() -> Generator[str, None, None]:
+        if not run_ids:
+            return
         serdag_cache: dict[Any, SerializedDagModel | None] = {}
-        for run_id in run_ids or []:
-            tis = session.execute(
-                select(
-                    TaskInstance.task_id,
-                    TaskInstance.state,
-                    TaskInstance.dag_version_id,
-                    TaskInstance.start_date,
-                    TaskInstance.end_date,
-                    DagVersion.version_number,
-                )
-                .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
-                .where(TaskInstance.dag_id == dag_id)
-                .where(TaskInstance.run_id == run_id)
-                .order_by(TaskInstance.task_id)
-                .execution_options(yield_per=1000)
+        tis_query = (
+            select(
+                TaskInstance.run_id,
+                TaskInstance.task_id,
+                TaskInstance.state,
+                TaskInstance.dag_version_id,
+                TaskInstance.start_date,
+                TaskInstance.end_date,
+                DagVersion.version_number,
             )
+            .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
+            .where(TaskInstance.dag_id == dag_id)
+            .where(TaskInstance.run_id.in_(run_ids))
+            .order_by(TaskInstance.run_id, TaskInstance.task_id)
+            .execution_options(yield_per=2000)
+        )
+
+        from itertools import groupby
+
+        for run_id, run_tis in groupby(session.execute(tis_query), key=lambda 
x: x.run_id):

Review Comment:
   The batching/grouping logic changes how results are fetched and iterated 
(single query + `groupby`), which is easy to regress (ordering, missing runs, 
grouping correctness). Add/adjust tests to cover multi-`run_id` requests and 
validate emitted NDJSON entries (including ordering expectations, if any) 
compared to the pre-batch behavior.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -468,27 +468,33 @@ def get_grid_ti_summaries_stream(
     """
 
     def _generate() -> Generator[str, None, None]:
+        if not run_ids:
+            return
         serdag_cache: dict[Any, SerializedDagModel | None] = {}
-        for run_id in run_ids or []:
-            tis = session.execute(
-                select(
-                    TaskInstance.task_id,
-                    TaskInstance.state,
-                    TaskInstance.dag_version_id,
-                    TaskInstance.start_date,
-                    TaskInstance.end_date,
-                    DagVersion.version_number,
-                )
-                .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
-                .where(TaskInstance.dag_id == dag_id)
-                .where(TaskInstance.run_id == run_id)
-                .order_by(TaskInstance.task_id)
-                .execution_options(yield_per=1000)
+        tis_query = (
+            select(
+                TaskInstance.run_id,
+                TaskInstance.task_id,
+                TaskInstance.state,
+                TaskInstance.dag_version_id,
+                TaskInstance.start_date,
+                TaskInstance.end_date,
+                DagVersion.version_number,
             )
+            .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
+            .where(TaskInstance.dag_id == dag_id)
+            .where(TaskInstance.run_id.in_(run_ids))

Review Comment:
   The batching/grouping logic changes how results are fetched and iterated 
(single query + `groupby`), which is easy to regress (ordering, missing runs, 
grouping correctness). Add/adjust tests to cover multi-`run_id` requests and 
validate emitted NDJSON entries (including ordering expectations, if any) 
compared to the pre-batch behavior.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -468,27 +468,33 @@ def get_grid_ti_summaries_stream(
     """
 
     def _generate() -> Generator[str, None, None]:
+        if not run_ids:
+            return
         serdag_cache: dict[Any, SerializedDagModel | None] = {}
-        for run_id in run_ids or []:
-            tis = session.execute(
-                select(
-                    TaskInstance.task_id,
-                    TaskInstance.state,
-                    TaskInstance.dag_version_id,
-                    TaskInstance.start_date,
-                    TaskInstance.end_date,
-                    DagVersion.version_number,
-                )
-                .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
-                .where(TaskInstance.dag_id == dag_id)
-                .where(TaskInstance.run_id == run_id)
-                .order_by(TaskInstance.task_id)
-                .execution_options(yield_per=1000)
+        tis_query = (

Review Comment:
   The batching/grouping logic changes how results are fetched and iterated 
(single query + `groupby`), which is easy to regress (ordering, missing runs, 
grouping correctness). Add/adjust tests to cover multi-`run_id` requests and 
validate emitted NDJSON entries (including ordering expectations, if any) 
compared to the pre-batch behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to