kyungjunleeme commented on code in PR #53216:
URL: https://github.com/apache/airflow/pull/53216#discussion_r2203213446
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -217,35 +219,74 @@ def get_grid_runs(
run_after: Annotated[RangeFilter,
Depends(datetime_range_filter_factory("run_after", DagRun))],
) -> list[GridRunsResponse]:
"""Get info about a run for the grid."""
- # Retrieve, sort the previous DAG Runs
- base_query = select(
- DagRun.dag_id,
- DagRun.run_id,
- DagRun.queued_at,
- DagRun.start_date,
- DagRun.end_date,
- DagRun.run_after,
- DagRun.state,
- DagRun.run_type,
- ).where(DagRun.dag_id == dag_id)
+ try:
+ # Base query to get DagRun information with version details
+ base_query = (
+
select(DagRun).options(joinedload(DagRun.created_dag_version)).where(DagRun.dag_id
== dag_id)
+ )
- # This comparison is to fall back to DAG timetable when no order_by is
provided
- if order_by.value == order_by.get_primary_key_string():
- latest_serdag = _get_latest_serdag(dag_id, session)
- latest_dag = latest_serdag.dag
- ordering = list(latest_dag.timetable.run_ordering)
- order_by = SortParam(
- allowed_attrs=ordering,
- model=DagRun,
- ).set_value(ordering[0])
- dag_runs_select_filter, _ = paginated_select(
- statement=base_query,
- order_by=order_by,
- offset=offset,
- filters=[run_after],
- limit=limit,
- )
- return session.execute(dag_runs_select_filter)
+ # This comparison is to fall back to DAG timetable when no order_by is
provided
+ if order_by.value == order_by.get_primary_key_string():
+ latest_serdag = _get_latest_serdag(dag_id, session)
+ latest_dag = latest_serdag.dag
+ ordering = list(latest_dag.timetable.run_ordering)
+ order_by = SortParam(
+ allowed_attrs=ordering,
+ model=DagRun,
+ ).set_value(ordering[0])
+
+ dag_runs_select_filter, _ = paginated_select(
+ statement=base_query,
+ order_by=order_by,
+ offset=offset,
+ filters=[run_after],
+ limit=limit,
+ )
+
+ dag_runs = list(session.scalars(dag_runs_select_filter))
+
+ if not dag_runs:
+ return []
+
+ version_service = DagVersionService(session)
+ version_info_list = version_service.get_version_info_for_runs(dag_id,
dag_runs)
+
+ response = []
+ for dag_run, version_info in zip(dag_runs, version_info_list):
+ grid_run = GridRunsResponse(
+ dag_id=dag_run.dag_id,
+ run_id=dag_run.run_id,
+ queued_at=dag_run.queued_at,
+ start_date=dag_run.start_date,
+ end_date=dag_run.end_date,
+ run_after=dag_run.run_after,
+ state=dag_run.state,
+ run_type=dag_run.run_type,
+ dag_version_number=version_info["dag_version_number"],
+ dag_version_id=version_info["dag_version_id"],
+ is_version_changed=version_info["is_version_changed"],
+ has_mixed_versions=version_info["has_mixed_versions"],
+ latest_version_number=version_info["latest_version_number"],
+ )
+ response.append(grid_run)
+
+ return response
+
+ except HTTPException:
+ # Re-raise HTTPException (like 404 from _get_latest_serdag) without
modification
+ raise
+ except ValueError as e:
+ log.error("Invalid data format while retrieving grid runs",
dag_id=dag_id, error=str(e))
Review Comment:
in advance .Nice PR. Thanks.
How about using `repr` instead of str?
I think that we should logging guie line document.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -217,35 +219,74 @@ def get_grid_runs(
run_after: Annotated[RangeFilter,
Depends(datetime_range_filter_factory("run_after", DagRun))],
) -> list[GridRunsResponse]:
"""Get info about a run for the grid."""
- # Retrieve, sort the previous DAG Runs
- base_query = select(
- DagRun.dag_id,
- DagRun.run_id,
- DagRun.queued_at,
- DagRun.start_date,
- DagRun.end_date,
- DagRun.run_after,
- DagRun.state,
- DagRun.run_type,
- ).where(DagRun.dag_id == dag_id)
+ try:
+ # Base query to get DagRun information with version details
+ base_query = (
+
select(DagRun).options(joinedload(DagRun.created_dag_version)).where(DagRun.dag_id
== dag_id)
+ )
- # This comparison is to fall back to DAG timetable when no order_by is
provided
- if order_by.value == order_by.get_primary_key_string():
- latest_serdag = _get_latest_serdag(dag_id, session)
- latest_dag = latest_serdag.dag
- ordering = list(latest_dag.timetable.run_ordering)
- order_by = SortParam(
- allowed_attrs=ordering,
- model=DagRun,
- ).set_value(ordering[0])
- dag_runs_select_filter, _ = paginated_select(
- statement=base_query,
- order_by=order_by,
- offset=offset,
- filters=[run_after],
- limit=limit,
- )
- return session.execute(dag_runs_select_filter)
+ # This comparison is to fall back to DAG timetable when no order_by is
provided
+ if order_by.value == order_by.get_primary_key_string():
+ latest_serdag = _get_latest_serdag(dag_id, session)
+ latest_dag = latest_serdag.dag
+ ordering = list(latest_dag.timetable.run_ordering)
+ order_by = SortParam(
+ allowed_attrs=ordering,
+ model=DagRun,
+ ).set_value(ordering[0])
+
+ dag_runs_select_filter, _ = paginated_select(
+ statement=base_query,
+ order_by=order_by,
+ offset=offset,
+ filters=[run_after],
+ limit=limit,
+ )
+
+ dag_runs = list(session.scalars(dag_runs_select_filter))
+
+ if not dag_runs:
+ return []
+
+ version_service = DagVersionService(session)
+ version_info_list = version_service.get_version_info_for_runs(dag_id,
dag_runs)
+
+ response = []
+ for dag_run, version_info in zip(dag_runs, version_info_list):
+ grid_run = GridRunsResponse(
+ dag_id=dag_run.dag_id,
+ run_id=dag_run.run_id,
+ queued_at=dag_run.queued_at,
+ start_date=dag_run.start_date,
+ end_date=dag_run.end_date,
+ run_after=dag_run.run_after,
+ state=dag_run.state,
+ run_type=dag_run.run_type,
+ dag_version_number=version_info["dag_version_number"],
+ dag_version_id=version_info["dag_version_id"],
+ is_version_changed=version_info["is_version_changed"],
+ has_mixed_versions=version_info["has_mixed_versions"],
+ latest_version_number=version_info["latest_version_number"],
+ )
+ response.append(grid_run)
+
+ return response
+
+ except HTTPException:
+ # Re-raise HTTPException (like 404 from _get_latest_serdag) without
modification
+ raise
+ except ValueError as e:
+ log.error("Invalid data format while retrieving grid runs",
dag_id=dag_id, error=str(e))
Review Comment:
in advance .Nice PR. Thanks.
How about using `repr` instead of str?
I think that we should write logging guie line document.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]