Copilot commented on code in PR #64963:
URL: https://github.com/apache/airflow/pull/64963#discussion_r3066505927
##########
airflow-core/src/airflow/api_fastapi/common/parameters.py:
##########
@@ -297,8 +387,10 @@ def to_orm(self, select: Select) -> Select:
)
columns: list[ColumnElement] = []
+ resolved: list[tuple[str, ColumnElement, bool]] = []
for order_by_value in order_by_values:
lstriped_orderby = order_by_value.lstrip("-")
+ attr_name = lstriped_orderby
column: Column | None = None
if self.to_replace:
replacement = self.to_replace.get(lstriped_orderby,
lstriped_orderby)
Review Comment:
`SortParam` now exposes `get_resolved_columns()` for cursor encoding, and
`encode_cursor()` relies on the returned `attr_name` to `getattr(row,
attr_name, ...)`. Here `attr_name` is set *before* applying `to_replace`, so
when `to_replace` maps an API field name to a model attribute name, the
resolved `attr_name` may not exist on the ORM instance and cursor tokens will
encode `null` for that sort field. Fix by ensuring the resolved `attr_name`
corresponds to the actual ORM attribute name used for sorting (e.g. set
`attr_name` after applying `replacement`, or derive it from the SQLAlchemy
`Column`/instrumented attribute key).
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -498,41 +515,54 @@ def get_task_instances(
if dag:
task_group_id.dag = dag
+ filters: list[OrmClause] = [
+ run_after_range,
+ logical_date_range,
+ start_date_range,
+ end_date_range,
+ update_at_range,
+ duration_range,
+ state,
+ pool,
+ pool_name_pattern,
+ queue,
+ queue_name_pattern,
+ executor,
+ task_id,
+ task_display_name_pattern,
+ task_group_id,
+ dag_id_pattern,
+ run_id_pattern,
+ version_number,
+ readable_ti_filter,
+ try_number,
+ operator,
+ operator_name_pattern,
+ map_index,
+ ]
+
+ if use_cursor:
+ task_instance_select = apply_filters_to_select(statement=query,
filters=[*filters, order_by, limit])
+ if cursor:
+ task_instance_select = apply_cursor_filter(task_instance_select,
cursor, order_by)
+
+ task_instances = list(session.scalars(task_instance_select))
+ return TaskInstanceCursorCollectionResponse(
+ task_instances=task_instances,
+ next_cursor=encode_cursor(task_instances[-1], order_by) if
task_instances else None,
+ previous_cursor=encode_cursor(task_instances[0], order_by) if
task_instances else None,
Review Comment:
The cursor pagination metadata is currently incorrect/insufficient: (1)
`previous_cursor` is always populated when the page has any rows, so the UI
will enable “Previous” even on the first page and the token does not represent
a valid ‘go back’ cursor under the current `apply_cursor_filter` semantics; (2)
`next_cursor` is always populated for any non-empty page, even on the last
page, so the UI will enable “Next” and can lead users to an empty result set. A
concrete fix is to implement proper keyset pagination by fetching `limit + 1`
rows to detect whether there is a next page, only returning `next_cursor` when
more rows exist, and either (a) dropping `previous_cursor` until a correct
‘before’ cursor mechanism is implemented, or (b) adding a direction-aware
cursor (e.g. `cursor_direction=next|prev` or encoding direction in the token)
plus a reversed-order query to support loading the previous page correctly.
```suggestion
task_instance_select = apply_filters_to_select(statement=query,
filters=[*filters, order_by])
if cursor:
task_instance_select = apply_cursor_filter(task_instance_select,
cursor, order_by)
task_instance_select = task_instance_select.limit(limit + 1)
task_instances = list(session.scalars(task_instance_select))
has_next_page = len(task_instances) > limit
if has_next_page:
task_instances = task_instances[:limit]
return TaskInstanceCursorCollectionResponse(
task_instances=task_instances,
next_cursor=encode_cursor(task_instances[-1], order_by) if
has_next_page and task_instances else None,
previous_cursor=None,
```
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -498,41 +515,54 @@ def get_task_instances(
if dag:
task_group_id.dag = dag
+ filters: list[OrmClause] = [
+ run_after_range,
+ logical_date_range,
+ start_date_range,
+ end_date_range,
+ update_at_range,
+ duration_range,
+ state,
+ pool,
+ pool_name_pattern,
+ queue,
+ queue_name_pattern,
+ executor,
+ task_id,
+ task_display_name_pattern,
+ task_group_id,
+ dag_id_pattern,
+ run_id_pattern,
+ version_number,
+ readable_ti_filter,
+ try_number,
+ operator,
+ operator_name_pattern,
+ map_index,
+ ]
+
+ if use_cursor:
+ task_instance_select = apply_filters_to_select(statement=query,
filters=[*filters, order_by, limit])
+ if cursor:
+ task_instance_select = apply_cursor_filter(task_instance_select,
cursor, order_by)
+
+ task_instances = list(session.scalars(task_instance_select))
+ return TaskInstanceCursorCollectionResponse(
+ task_instances=task_instances,
+ next_cursor=encode_cursor(task_instances[-1], order_by) if
task_instances else None,
+ previous_cursor=encode_cursor(task_instances[0], order_by) if
task_instances else None,
Review Comment:
Cursor pagination behavior introduced here isn’t adequately covered by tests
for boundary conditions: there should be assertions that `previous_cursor` is
`null`/absent on the first page (so “Previous” is disabled), and that
`next_cursor` becomes `null` when the last page is reached (so “Next” is
disabled). Adding tests for these conditions (and for any implemented ‘previous
page’ mechanism) would prevent regressions in pagination UX and correctness.
```suggestion
next_cursor = None
previous_cursor = encode_cursor(task_instances[0], order_by) if
cursor and task_instances else None
if task_instances:
current_page_last_cursor = encode_cursor(task_instances[-1],
order_by)
next_page_probe = apply_filters_to_select(statement=query,
filters=[*filters, order_by]).limit(1)
next_page_probe = apply_cursor_filter(next_page_probe,
current_page_last_cursor, order_by)
has_next_page = next(session.scalars(next_page_probe), None) is
not None
if has_next_page:
next_cursor = current_page_last_cursor
return TaskInstanceCursorCollectionResponse(
task_instances=task_instances,
next_cursor=next_cursor,
previous_cursor=previous_cursor,
```
##########
airflow-core/src/airflow/ui/src/components/DataTable/DataTable.tsx:
##########
@@ -206,6 +219,30 @@ export const DataTable = <TData,>({
</HStack>
</Pagination.Root>
) : undefined}
+ {hasCursorPagination ? (
+ <HStack justifyContent="center" my={2}>
+ <IconButton
+ aria-label="Previous page"
+ data-testid="cursor-prev"
+ disabled={!cursorPagination.hasPrevious}
+ onClick={cursorPagination.onPrevious}
+ size="sm"
+ variant="outline"
+ >
+ <HiChevronLeft />
+ </IconButton>
+ <IconButton
+ aria-label="Next page"
+ data-testid="cursor-next"
+ disabled={!cursorPagination.hasNext}
+ onClick={cursorPagination.onNext}
+ size="sm"
+ variant="outline"
+ >
+ <HiChevronRight />
+ </IconButton>
Review Comment:
Chakra UI `IconButton` is typically typed to require an `icon` prop and omit
`children` (i.e., `children` is not accepted by the type). As written, this is
likely to fail TypeScript builds or render inconsistently depending on Chakra
version. Use the `icon` prop (e.g. `icon={<HiChevronLeft />}` /
`icon={<HiChevronRight />}`) instead of providing icon components as children.
```suggestion
icon={<HiChevronLeft />}
onClick={cursorPagination.onPrevious}
size="sm"
variant="outline"
/>
<IconButton
aria-label="Next page"
data-testid="cursor-next"
disabled={!cursorPagination.hasNext}
icon={<HiChevronRight />}
onClick={cursorPagination.onNext}
size="sm"
variant="outline"
/>
```
##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/common.py:
##########
@@ -156,3 +156,18 @@ class BulkResponse(BaseModel):
default=None,
description="Details of the bulk delete operation, including
successful keys and errors.",
)
+
+
+class OffsetPaginatedResponse(BaseModel):
+ """Base for offset-paginated collection responses."""
+
+ pagination: Literal["offset"] = "offset"
+ total_entries: int
+
+
+class CursorPaginatedResponse(BaseModel):
+ """Base for cursor-paginated collection responses."""
+
+ pagination: Literal["cursor"] = "cursor"
Review Comment:
The OpenAPI spec uses a discriminator on `pagination`, but these models
define `pagination` with a default, which typically results in `pagination` not
being listed as a required property in the generated schema. In OpenAPI 3.0,
the discriminator property must be required for reliable client
generation/validation. A concrete fix is to make `pagination` required (remove
the default) and set it explicitly when constructing responses (e.g.
`TaskInstanceOffsetCollectionResponse(pagination=\"offset\", ...)` /
`TaskInstanceCursorCollectionResponse(pagination=\"cursor\", ...)`), so it
appears in `required` and the discriminator is spec-compliant.
```suggestion
pagination: Literal["offset"]
total_entries: int
class CursorPaginatedResponse(BaseModel):
"""Base for cursor-paginated collection responses."""
pagination: Literal["cursor"]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]