Copilot commented on code in PR #64953:
URL: https://github.com/apache/airflow/pull/64953#discussion_r3066487897
##########
airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts:
##########
@@ -3140,7 +3155,7 @@ export type GetTaskInstancesData = {
versionNumber?: Array<(number)>;
};
-export type GetTaskInstancesResponse = TaskInstanceCollectionResponse;
+export type GetTaskInstancesResponse = TaskInstanceOffsetCollectionResponse |
TaskInstanceCursorCollectionResponse;
Review Comment:
The discriminator field `pagination` is optional in both response types,
which weakens reliable client-side narrowing (and contradicts the OpenAPI
discriminator usage); it should be required (`pagination: \"cursor\"` /
`pagination: \"offset\"`). Since the server always sets this field (defaulted
literal in the base models), updating the OpenAPI schema/codegen so
`pagination` is required will make the union safer and simplify consumers.
```suggestion
export type GetTaskInstancesResponse =
| (TaskInstanceOffsetCollectionResponse & {
pagination: "offset";
})
| (TaskInstanceCursorCollectionResponse & {
pagination: "cursor";
});
```
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -1717,6 +1717,100 @@ def test_should_respond_200_for_pagination(self,
test_client, session):
assert (num_entries_batch1 + num_entries_batch2) == ti_count
assert response_batch1 != response_batch2
+ def test_cursor_pagination_first_page(self, test_client, session):
+ """First page with cursor='' returns cursor response without needing a
real token."""
+ dag_id = "example_python_operator"
+ self.create_task_instances(
+ session,
+ task_instances=[
+ {"start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i +
1))} for i in range(5)
+ ],
+ dag_id=dag_id,
+ )
+ response = test_client.get(
+ "/dags/~/dagRuns/~/taskInstances",
+ params={"limit": 3, "order_by": ["map_index"], "cursor": ""},
+ )
+ assert response.status_code == 200, response.json()
+ body = response.json()
+ assert body["pagination"] == "cursor"
+ assert "next_cursor" in body
+ assert "previous_cursor" in body
+ assert "total_entries" not in body
+ assert len(body["task_instances"]) == 3
Review Comment:
The cursor pagination tests validate the shape of the response but don’t
assert correct boundary semantics (e.g., `previous_cursor is null` on the first
page; `next_cursor is null` on the last page; using `previous_cursor` actually
navigates backwards without overlap). Adding these assertions would catch the
current behavior where cursors are always returned whenever rows exist.
##########
airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts:
##########
@@ -1422,6 +1424,15 @@ export type TaskInstanceHistoryResponse = {
dag_version: DagVersionResponse | null;
};
+/**
+ * Offset-paginated task instance collection response.
+ */
+export type TaskInstanceOffsetCollectionResponse = {
+ pagination?: "offset";
+ total_entries: number;
+ task_instances: Array<TaskInstanceResponse>;
+};
Review Comment:
The discriminator field `pagination` is optional in both response types,
which weakens reliable client-side narrowing (and contradicts the OpenAPI
discriminator usage); it should be required (`pagination: \"cursor\"` /
`pagination: \"offset\"`). Since the server always sets this field (defaulted
literal in the base models), updating the OpenAPI schema/codegen so
`pagination` is required will make the union safer and simplify consumers.
##########
airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts:
##########
@@ -2367,7 +2375,7 @@ export class TaskInstanceService {
* @param data.limit
* @param data.offset
* @param data.orderBy Attributes to order by, multi criteria sort is
supported. Prefix with `-` for descending order. Supported attributes: `id,
state, duration, start_date, end_date, map_index, try_number, logical_date,
run_after, data_interval_start, data_interval_end, rendered_map_index,
operator, logical_date, run_after, data_interval_start, data_interval_end`
- * @returns TaskInstanceCollectionResponse Successful Response
+ * @returns unknown Successful Response
Review Comment:
The generated JSDoc says `@returns unknown` even though the method signature
returns `CancelablePromise<GetTaskInstancesResponse>`. This is misleading for
consumers and suggests the generator template isn’t mapping the new
`oneOf`/discriminator response properly; update the OpenAPI TS
generator/templates so the JSDoc return type matches `GetTaskInstancesResponse`
(and similarly in the generated queries/prefetch/ensure helpers where the same
`unknown` appears).
```suggestion
* @returns GetTaskInstancesResponse Successful Response
```
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -498,41 +515,54 @@ def get_task_instances(
if dag:
task_group_id.dag = dag
+ filters: list[OrmClause] = [
+ run_after_range,
+ logical_date_range,
+ start_date_range,
+ end_date_range,
+ update_at_range,
+ duration_range,
+ state,
+ pool,
+ pool_name_pattern,
+ queue,
+ queue_name_pattern,
+ executor,
+ task_id,
+ task_display_name_pattern,
+ task_group_id,
+ dag_id_pattern,
+ run_id_pattern,
+ version_number,
+ readable_ti_filter,
+ try_number,
+ operator,
+ operator_name_pattern,
+ map_index,
+ ]
+
+ if use_cursor:
+ task_instance_select = apply_filters_to_select(statement=query,
filters=[*filters, order_by, limit])
+ if cursor:
+ task_instance_select = apply_cursor_filter(task_instance_select,
cursor, order_by)
+
+ task_instances = list(session.scalars(task_instance_select))
+ return TaskInstanceCursorCollectionResponse(
+ task_instances=task_instances,
+ next_cursor=encode_cursor(task_instances[-1], order_by) if
task_instances else None,
+ previous_cursor=encode_cursor(task_instances[0], order_by) if
task_instances else None,
Review Comment:
`next_cursor`/`previous_cursor` are currently non-null whenever the page has
any rows, which makes the UI think both directions are available even on the
first/last page. Additionally, `previous_cursor` as “cursor of the first row”
won’t reliably navigate backwards with the current `apply_cursor_filter`
semantics (it always paginates forward relative to the comparator). Consider:
(1) fetching `limit + 1` rows to decide whether a next page exists and only
setting `next_cursor` when there is one (return exactly `limit` rows), and (2)
introducing an explicit direction (e.g., `cursor_direction=next|prev`) or
separate `before_cursor`/`after_cursor` parameters so the server can compute a
true previous page (typically by reversing sort + comparator, then reversing
results) and return `previous_cursor=None` on the first page.
```suggestion
cursor_limit = LimitFilter(limit.value + 1)
task_instance_select = apply_filters_to_select(statement=query,
filters=[*filters, order_by, cursor_limit])
if cursor:
task_instance_select = apply_cursor_filter(task_instance_select,
cursor, order_by)
task_instances = list(session.scalars(task_instance_select))
has_next_page = len(task_instances) > limit.value
task_instances = task_instances[: limit.value]
return TaskInstanceCursorCollectionResponse(
task_instances=task_instances,
next_cursor=encode_cursor(task_instances[-1], order_by) if
has_next_page and task_instances else None,
previous_cursor=None,
```
##########
airflow-core/src/airflow/api_fastapi/common/cursors.py:
##########
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Cursor-based (keyset) pagination helpers.
+
+:meta private:
+"""
+
+from __future__ import annotations
+
+import base64
+import json
+import uuid as uuid_mod
+from datetime import datetime
+from typing import TYPE_CHECKING, Any
+
+from fastapi import HTTPException, status
+from sqlalchemy import and_, or_
+
+if TYPE_CHECKING:
+ from sqlalchemy.sql import Select
+
+ from airflow.api_fastapi.common.parameters import SortParam
+
+
+def _encode_value(val: Any) -> dict[str, Any]:
+ """Encode a single Python value as a typed {"type": ..., "value": ...}
object."""
+ if val is None:
+ return {"type": "null", "value": None}
+ if isinstance(val, uuid_mod.UUID):
+ return {"type": "uuid", "value": str(val)}
+ if isinstance(val, datetime):
+ return {"type": "datetime", "value": val.isoformat()}
+ if isinstance(val, int):
+ return {"type": "int", "value": val}
+ return {"type": "str", "value": str(val)}
+
+
+def _decode_value(entry: dict[str, Any]) -> Any:
+ """Decode a typed cursor entry back to its Python value."""
+ type_tag = entry["type"]
+ raw = entry["value"]
+ if type_tag == "null":
+ return None
+ if type_tag == "uuid":
+ return uuid_mod.UUID(str(raw))
+ if type_tag == "datetime":
+ return datetime.fromisoformat(str(raw))
+ if type_tag == "int":
+ return int(raw)
+ return str(raw)
+
+
+def encode_cursor(row: Any, sort_param: SortParam) -> str:
+ """
+ Encode cursor token from the last row of a result set.
+
+ The token is a base64url-encoded JSON list of typed objects, each
+ containing ``{"type": "<tag>", "value": <serialized>}`` so the
+ cursor is self-describing and can be decoded without column metadata.
+ """
+ resolved = sort_param.get_resolved_columns()
+ if not resolved:
+ raise ValueError("SortParam has no resolved columns.")
+
+ entries = [_encode_value(getattr(row, attr_name, None)) for attr_name,
_col, _desc in resolved]
+ return base64.urlsafe_b64encode(json.dumps(entries).encode()).decode()
+
+
+def decode_cursor(token: str) -> list[dict[str, Any]]:
+ """Decode a cursor token and return the list of typed value entries."""
+ try:
+ data = json.loads(base64.urlsafe_b64decode(token))
+ except Exception:
+ raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor
token")
+
+ if not isinstance(data, list) or any(
+ not isinstance(entry, dict) or "type" not in entry or "value" not in
entry for entry in data
+ ):
+ raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor token
structure")
+
+ return data
+
+
+def apply_cursor_filter(statement: Select, cursor: str, sort_param: SortParam)
-> Select:
+ """
+ Apply a keyset pagination WHERE clause from a cursor token.
+
+ Builds a composite comparison that respects mixed ASC/DESC ordering
+ on the resolved sort columns.
+ """
+ cursor_entries = decode_cursor(cursor)
+
+ resolved = sort_param.get_resolved_columns()
+ if len(cursor_entries) != len(resolved):
+ raise HTTPException(status.HTTP_400_BAD_REQUEST, "Cursor token does
not match current query shape")
+
+ parsed_values = [_decode_value(entry) for entry in cursor_entries]
+
+ # Build the keyset WHERE clause for mixed ASC/DESC ordering.
+ # For columns (c1 ASC, c2 DESC, c3 ASC) with cursor values (v1, v2, v3):
+ # (c1 > v1) OR
+ # (c1 = v1 AND c2 < v2) OR
+ # (c1 = v1 AND c2 = v2 AND c3 > v3)
+ or_clauses = []
+ for i, (_, col, is_desc) in enumerate(resolved):
+ eq_conditions = [resolved[j][1] == parsed_values[j] for j in range(i)]
+ if is_desc:
+ bound = col < parsed_values[i]
+ else:
+ bound = col > parsed_values[i]
+ or_clauses.append(and_(*eq_conditions, bound))
Review Comment:
This comparison logic breaks when any cursor value is `None` (e.g., sorting
by nullable columns like `start_date`): SQL comparisons such as `col >
NULL`/`col < NULL` evaluate to NULL/false, causing paging to return empty or
inconsistent results. To make cursor pagination correct, either disallow cursor
mode when ordering includes nullable columns (unless you apply a deterministic
null ordering), or implement null-aware keyset comparisons that match the exact
ORDER BY behavior (e.g., using `nulls_last/nulls_first` + corresponding `IS
NULL` tie-break conditions or `coalesce` consistently in both ordering and
cursor filtering).
##########
airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts:
##########
@@ -1375,11 +1375,13 @@ export type TaskInletAssetReference = {
};
/**
- * Task Instance Collection serializer for responses.
+ * Cursor-paginated task instance collection response.
*/
-export type TaskInstanceCollectionResponse = {
+export type TaskInstanceCursorCollectionResponse = {
+ pagination?: "cursor";
+ next_cursor: string | null;
+ previous_cursor: string | null;
task_instances: Array<TaskInstanceResponse>;
- total_entries: number;
};
Review Comment:
The discriminator field `pagination` is optional in both response types,
which weakens reliable client-side narrowing (and contradicts the OpenAPI
discriminator usage); it should be required (`pagination: \"cursor\"` /
`pagination: \"offset\"`). Since the server always sets this field (defaulted
literal in the base models), updating the OpenAPI schema/codegen so
`pagination` is required will make the union safer and simplify consumers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]