Copilot commented on code in PR #64953:
URL: https://github.com/apache/airflow/pull/64953#discussion_r3066487897


##########
airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts:
##########
@@ -3140,7 +3155,7 @@ export type GetTaskInstancesData = {
     versionNumber?: Array<(number)>;
 };
 
-export type GetTaskInstancesResponse = TaskInstanceCollectionResponse;
+export type GetTaskInstancesResponse = TaskInstanceOffsetCollectionResponse | 
TaskInstanceCursorCollectionResponse;

Review Comment:
   The discriminator field `pagination` is optional in both response types, 
which weakens reliable client-side narrowing (and contradicts the OpenAPI 
discriminator usage); it should be required (`pagination: \"cursor\"` / 
`pagination: \"offset\"`). Since the server always sets this field (defaulted 
literal in the base models), updating the OpenAPI schema/codegen so 
`pagination` is required will make the union safer and simplify consumers.
   ```suggestion
   export type GetTaskInstancesResponse =
       | (TaskInstanceOffsetCollectionResponse & {
             pagination: "offset";
         })
       | (TaskInstanceCursorCollectionResponse & {
             pagination: "cursor";
         });
   ```



##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -1717,6 +1717,100 @@ def test_should_respond_200_for_pagination(self, 
test_client, session):
         assert (num_entries_batch1 + num_entries_batch2) == ti_count
         assert response_batch1 != response_batch2
 
+    def test_cursor_pagination_first_page(self, test_client, session):
+        """First page with cursor='' returns cursor response without needing a 
real token."""
+        dag_id = "example_python_operator"
+        self.create_task_instances(
+            session,
+            task_instances=[
+                {"start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i + 
1))} for i in range(5)
+            ],
+            dag_id=dag_id,
+        )
+        response = test_client.get(
+            "/dags/~/dagRuns/~/taskInstances",
+            params={"limit": 3, "order_by": ["map_index"], "cursor": ""},
+        )
+        assert response.status_code == 200, response.json()
+        body = response.json()
+        assert body["pagination"] == "cursor"
+        assert "next_cursor" in body
+        assert "previous_cursor" in body
+        assert "total_entries" not in body
+        assert len(body["task_instances"]) == 3

Review Comment:
   The cursor pagination tests validate the shape of the response but don’t 
assert correct boundary semantics (e.g., `previous_cursor is null` on the first 
page; `next_cursor is null` on the last page; using `previous_cursor` actually 
navigates backwards without overlap). Adding these assertions would catch the 
current behavior where cursors are always returned whenever rows exist.



##########
airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts:
##########
@@ -1422,6 +1424,15 @@ export type TaskInstanceHistoryResponse = {
     dag_version: DagVersionResponse | null;
 };
 
+/**
+ * Offset-paginated task instance collection response.
+ */
+export type TaskInstanceOffsetCollectionResponse = {
+    pagination?: "offset";
+    total_entries: number;
+    task_instances: Array<TaskInstanceResponse>;
+};

Review Comment:
   The discriminator field `pagination` is optional in both response types, 
which weakens reliable client-side narrowing (and contradicts the OpenAPI 
discriminator usage); it should be required (`pagination: \"cursor\"` / 
`pagination: \"offset\"`). Since the server always sets this field (defaulted 
literal in the base models), updating the OpenAPI schema/codegen so 
`pagination` is required will make the union safer and simplify consumers.



##########
airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts:
##########
@@ -2367,7 +2375,7 @@ export class TaskInstanceService {
      * @param data.limit
      * @param data.offset
      * @param data.orderBy Attributes to order by, multi criteria sort is 
supported. Prefix with `-` for descending order. Supported attributes: `id, 
state, duration, start_date, end_date, map_index, try_number, logical_date, 
run_after, data_interval_start, data_interval_end, rendered_map_index, 
operator, logical_date, run_after, data_interval_start, data_interval_end`
-     * @returns TaskInstanceCollectionResponse Successful Response
+     * @returns unknown Successful Response

Review Comment:
   The generated JSDoc says `@returns unknown` even though the method signature 
returns `CancelablePromise<GetTaskInstancesResponse>`. This is misleading for 
consumers and suggests the generator template isn’t mapping the new 
`oneOf`/discriminator response properly; update the OpenAPI TS 
generator/templates so the JSDoc return type matches `GetTaskInstancesResponse` 
(and similarly in the generated queries/prefetch/ensure helpers where the same 
`unknown` appears).
   ```suggestion
        * @returns GetTaskInstancesResponse Successful Response
   ```



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -498,41 +515,54 @@ def get_task_instances(
         if dag:
             task_group_id.dag = dag
 
+    filters: list[OrmClause] = [
+        run_after_range,
+        logical_date_range,
+        start_date_range,
+        end_date_range,
+        update_at_range,
+        duration_range,
+        state,
+        pool,
+        pool_name_pattern,
+        queue,
+        queue_name_pattern,
+        executor,
+        task_id,
+        task_display_name_pattern,
+        task_group_id,
+        dag_id_pattern,
+        run_id_pattern,
+        version_number,
+        readable_ti_filter,
+        try_number,
+        operator,
+        operator_name_pattern,
+        map_index,
+    ]
+
+    if use_cursor:
+        task_instance_select = apply_filters_to_select(statement=query, 
filters=[*filters, order_by, limit])
+        if cursor:
+            task_instance_select = apply_cursor_filter(task_instance_select, 
cursor, order_by)
+
+        task_instances = list(session.scalars(task_instance_select))
+        return TaskInstanceCursorCollectionResponse(
+            task_instances=task_instances,
+            next_cursor=encode_cursor(task_instances[-1], order_by) if 
task_instances else None,
+            previous_cursor=encode_cursor(task_instances[0], order_by) if 
task_instances else None,

Review Comment:
   `next_cursor`/`previous_cursor` are currently non-null whenever the page has 
any rows, which makes the UI think both directions are available even on the 
first/last page. Additionally, `previous_cursor` as “cursor of the first row” 
won’t reliably navigate backwards with the current `apply_cursor_filter` 
semantics (it always paginates forward relative to the comparator). Consider: 
(1) fetching `limit + 1` rows to decide whether a next page exists and only 
setting `next_cursor` when there is one (return exactly `limit` rows), and (2) 
introducing an explicit direction (e.g., `cursor_direction=next|prev`) or 
separate `before_cursor`/`after_cursor` parameters so the server can compute a 
true previous page (typically by reversing sort + comparator, then reversing 
results) and return `previous_cursor=None` on the first page.
   ```suggestion
           cursor_limit = LimitFilter(limit.value + 1)
           task_instance_select = apply_filters_to_select(statement=query, 
filters=[*filters, order_by, cursor_limit])
           if cursor:
               task_instance_select = apply_cursor_filter(task_instance_select, 
cursor, order_by)
   
           task_instances = list(session.scalars(task_instance_select))
           has_next_page = len(task_instances) > limit.value
           task_instances = task_instances[: limit.value]
   
           return TaskInstanceCursorCollectionResponse(
               task_instances=task_instances,
               next_cursor=encode_cursor(task_instances[-1], order_by) if 
has_next_page and task_instances else None,
               previous_cursor=None,
   ```



##########
airflow-core/src/airflow/api_fastapi/common/cursors.py:
##########
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Cursor-based (keyset) pagination helpers.
+
+:meta private:
+"""
+
+from __future__ import annotations
+
+import base64
+import json
+import uuid as uuid_mod
+from datetime import datetime
+from typing import TYPE_CHECKING, Any
+
+from fastapi import HTTPException, status
+from sqlalchemy import and_, or_
+
+if TYPE_CHECKING:
+    from sqlalchemy.sql import Select
+
+    from airflow.api_fastapi.common.parameters import SortParam
+
+
+def _encode_value(val: Any) -> dict[str, Any]:
+    """Encode a single Python value as a typed {"type": ..., "value": ...} 
object."""
+    if val is None:
+        return {"type": "null", "value": None}
+    if isinstance(val, uuid_mod.UUID):
+        return {"type": "uuid", "value": str(val)}
+    if isinstance(val, datetime):
+        return {"type": "datetime", "value": val.isoformat()}
+    if isinstance(val, int):
+        return {"type": "int", "value": val}
+    return {"type": "str", "value": str(val)}
+
+
+def _decode_value(entry: dict[str, Any]) -> Any:
+    """Decode a typed cursor entry back to its Python value."""
+    type_tag = entry["type"]
+    raw = entry["value"]
+    if type_tag == "null":
+        return None
+    if type_tag == "uuid":
+        return uuid_mod.UUID(str(raw))
+    if type_tag == "datetime":
+        return datetime.fromisoformat(str(raw))
+    if type_tag == "int":
+        return int(raw)
+    return str(raw)
+
+
+def encode_cursor(row: Any, sort_param: SortParam) -> str:
+    """
+    Encode cursor token from the last row of a result set.
+
+    The token is a base64url-encoded JSON list of typed objects, each
+    containing ``{"type": "<tag>", "value": <serialized>}`` so the
+    cursor is self-describing and can be decoded without column metadata.
+    """
+    resolved = sort_param.get_resolved_columns()
+    if not resolved:
+        raise ValueError("SortParam has no resolved columns.")
+
+    entries = [_encode_value(getattr(row, attr_name, None)) for attr_name, 
_col, _desc in resolved]
+    return base64.urlsafe_b64encode(json.dumps(entries).encode()).decode()
+
+
+def decode_cursor(token: str) -> list[dict[str, Any]]:
+    """Decode a cursor token and return the list of typed value entries."""
+    try:
+        data = json.loads(base64.urlsafe_b64decode(token))
+    except Exception:
+        raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor 
token")
+
+    if not isinstance(data, list) or any(
+        not isinstance(entry, dict) or "type" not in entry or "value" not in 
entry for entry in data
+    ):
+        raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor token 
structure")
+
+    return data
+
+
+def apply_cursor_filter(statement: Select, cursor: str, sort_param: SortParam) 
-> Select:
+    """
+    Apply a keyset pagination WHERE clause from a cursor token.
+
+    Builds a composite comparison that respects mixed ASC/DESC ordering
+    on the resolved sort columns.
+    """
+    cursor_entries = decode_cursor(cursor)
+
+    resolved = sort_param.get_resolved_columns()
+    if len(cursor_entries) != len(resolved):
+        raise HTTPException(status.HTTP_400_BAD_REQUEST, "Cursor token does 
not match current query shape")
+
+    parsed_values = [_decode_value(entry) for entry in cursor_entries]
+
+    # Build the keyset WHERE clause for mixed ASC/DESC ordering.
+    # For columns (c1 ASC, c2 DESC, c3 ASC) with cursor values (v1, v2, v3):
+    #   (c1 > v1) OR
+    #   (c1 = v1 AND c2 < v2) OR
+    #   (c1 = v1 AND c2 = v2 AND c3 > v3)
+    or_clauses = []
+    for i, (_, col, is_desc) in enumerate(resolved):
+        eq_conditions = [resolved[j][1] == parsed_values[j] for j in range(i)]
+        if is_desc:
+            bound = col < parsed_values[i]
+        else:
+            bound = col > parsed_values[i]
+        or_clauses.append(and_(*eq_conditions, bound))

Review Comment:
   This comparison logic breaks when any cursor value is `None` (e.g., sorting 
by nullable columns like `start_date`): SQL comparisons such as `col > 
NULL`/`col < NULL` evaluate to NULL/false, causing paging to return empty or 
inconsistent results. To make cursor pagination correct, either disallow cursor 
mode when ordering includes nullable columns (unless you apply a deterministic 
null ordering), or implement null-aware keyset comparisons that match the exact 
ORDER BY behavior (e.g., using `nulls_last/nulls_first` + corresponding `IS 
NULL` tie-break conditions or `coalesce` consistently in both ordering and 
cursor filtering).



##########
airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts:
##########
@@ -1375,11 +1375,13 @@ export type TaskInletAssetReference = {
 };
 
 /**
- * Task Instance Collection serializer for responses.
+ * Cursor-paginated task instance collection response.
  */
-export type TaskInstanceCollectionResponse = {
+export type TaskInstanceCursorCollectionResponse = {
+    pagination?: "cursor";
+    next_cursor: string | null;
+    previous_cursor: string | null;
     task_instances: Array<TaskInstanceResponse>;
-    total_entries: number;
 };

Review Comment:
   The discriminator field `pagination` is optional in both response types, 
which weakens reliable client-side narrowing (and contradicts the OpenAPI 
discriminator usage); it should be required (`pagination: \"cursor\"` / 
`pagination: \"offset\"`). Since the server always sets this field (defaulted 
literal in the base models), updating the OpenAPI schema/codegen so 
`pagination` is required will make the union safer and simplify consumers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to