Copilot commented on code in PR #64845:
URL: https://github.com/apache/airflow/pull/64845#discussion_r3066478533


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -498,40 +513,53 @@ def get_task_instances(
         if dag:
             task_group_id.dag = dag
 
+    filters: list[OrmClause] = [
+        run_after_range,
+        logical_date_range,
+        start_date_range,
+        end_date_range,
+        update_at_range,
+        duration_range,
+        state,
+        pool,
+        pool_name_pattern,
+        queue,
+        queue_name_pattern,
+        executor,
+        task_id,
+        task_display_name_pattern,
+        task_group_id,
+        dag_id_pattern,
+        run_id_pattern,
+        version_number,
+        readable_ti_filter,
+        try_number,
+        operator,
+        operator_name_pattern,
+        map_index,
+    ]
+
+    if use_cursor:
+        task_instance_select = apply_filters_to_select(statement=query, 
filters=[*filters, order_by, limit])
+        if cursor:
+            task_instance_select = apply_cursor_filter(task_instance_select, 
cursor, order_by)
+
+        task_instances = list(session.scalars(task_instance_select))
+        return TaskInstanceCollectionResponse(
+            task_instances=task_instances,
+            next_cursor=encode_cursor(task_instances[-1], order_by) if 
task_instances else None,
+            previous_cursor=encode_cursor(task_instances[0], order_by) if 
task_instances else None,

Review Comment:
   `next_cursor`/`previous_cursor` are computed as cursors for the last/first 
item of the *current* page, but the API docs claim they point to the 
next/previous page and should become `null` at the end/start. As implemented, 
`next_cursor` will be non-null even on the last page (because the code doesn't 
fetch `limit + 1` to detect more results), and `previous_cursor` will also be 
non-null on the first page. Recommendation (mandatory): fetch one extra row 
(`limit + 1`) to decide whether to return `next_cursor`, and either (a) 
remove/rename `previous_cursor` to something like `current_page_start_cursor`, 
or (b) implement true backwards pagination semantics (e.g., separate 
`before`/`after` cursors or a direction flag) so `previous_cursor` actually 
retrieves the previous page.
   ```suggestion
           task_instance_select = apply_filters_to_select(statement=query, 
filters=[*filters, order_by]).limit(
               limit + 1
           )
           if cursor:
               task_instance_select = apply_cursor_filter(task_instance_select, 
cursor, order_by)
   
           fetched_task_instances = list(session.scalars(task_instance_select))
           has_next_page = len(fetched_task_instances) > limit
           task_instances = fetched_task_instances[:limit]
   
           return TaskInstanceCollectionResponse(
               task_instances=task_instances,
               next_cursor=encode_cursor(task_instances[-1], order_by) if 
has_next_page and task_instances else None,
               previous_cursor=None,
   ```



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -470,13 +472,26 @@ def get_task_instances(
     ],
     readable_ti_filter: ReadableTIFilterDep,
     session: SessionDep,
+    cursor: str | None = Query(
+        None,
+        description="Cursor for keyset-based pagination (mutually exclusive 
with offset). "
+        "Pass an empty string for the first page, then use ``next_cursor`` 
from the response.",
+    ),
 ) -> TaskInstanceCollectionResponse:
     """
     Get list of task instances.
 
-    This endpoint allows specifying `~` as the dag_id, dag_run_id to retrieve 
Task Instances for all DAGs
-    and DAG runs.
+    This endpoint allows specifying `~` as the dag_id, dag_run_id
+    to retrieve task instances for all DAGs and DAG runs.
+
+    Supports two pagination modes:
+
+    **Offset (default):** use `limit` and `offset` query parameters. Returns 
`total_entries`.
+
+    **Cursor:** pass `cursor` (empty string for the first page, then 
`next_cursor` from the response).
+    When `cursor` is provided, `offset` is ignored and `total_entries` is not 
returned.
     """
+    use_cursor = cursor is not None

Review Comment:
   The parameter description says cursor pagination is \"mutually exclusive 
with offset\", but the implementation silently ignores `offset` when `cursor` 
is provided. Recommendation (mandatory): either enforce exclusivity (e.g., 
return 400/422 when both `cursor` and a nonzero `offset` are provided), or 
update the description/OpenAPI docs to match the actual behavior (\"offset is 
ignored when cursor is set\").



##########
airflow-core/tests/unit/api_fastapi/common/test_cursors.py:
##########
@@ -0,0 +1,144 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import base64
+import uuid
+from datetime import datetime, timezone
+from unittest.mock import MagicMock
+
+import msgspec
+import pytest
+from fastapi import HTTPException
+from sqlalchemy import select
+
+from airflow.api_fastapi.common.cursors import apply_cursor_filter, 
decode_cursor, encode_cursor
+from airflow.api_fastapi.common.parameters import SortParam
+from airflow.models.taskinstance import TaskInstance
+
+
+def _msgpack_cursor_token(payload: object) -> str:
+    """Match production: msgpack + base64url without padding."""
+    return 
base64.urlsafe_b64encode(msgspec.msgpack.encode(payload)).decode("ascii").rstrip("=")
+
+
+class TestCursorPagination:
+    """Tests for cursor-based pagination helpers."""
+
+    def _make_sort_param_with_resolved_columns(self, order_by_values=None):
+        """Build a SortParam for TaskInstance and resolve its columns."""
+        sp = SortParam(["id", "start_date", "map_index"], TaskInstance)
+        sp.set_value(order_by_values or ["map_index"])
+        sp.to_orm(select(TaskInstance))
+        return sp
+
+    def test_encode_decode_cursor_roundtrip(self):
+        sp = self._make_sort_param_with_resolved_columns(["start_date"])
+        row = MagicMock(spec=["start_date", "id"])
+        row.start_date = "2024-01-15T10:00:00+00:00"
+        row.id = "019462ab-1234-5678-9abc-def012345678"
+
+        token = encode_cursor(row, sp)
+        decoded = decode_cursor(token)
+
+        assert decoded == [
+            "2024-01-15T10:00:00+00:00",
+            "019462ab-1234-5678-9abc-def012345678",
+        ]
+
+    def test_decode_cursor_invalid_base64(self):
+        with pytest.raises(HTTPException, match="Invalid cursor token"):
+            decode_cursor("not-valid-base64!!!")
+
+    def test_decode_cursor_invalid_msgpack(self):
+        token = base64.urlsafe_b64encode(b"not-msgpack").decode().rstrip("=")
+        with pytest.raises(HTTPException, match="Invalid cursor token"):
+            decode_cursor(token)
+
+    def test_decode_cursor_not_a_list(self):
+        token = _msgpack_cursor_token({"wrong": "type"})
+        with pytest.raises(HTTPException, match="Invalid cursor token 
structure"):
+            decode_cursor(token)
+
+    def test_encode_cursor_works_without_prior_to_orm(self):
+        """get_resolved_columns now lazily resolves, so to_orm is no longer 
required before encode."""
+        sp = SortParam(["id"], TaskInstance)
+        sp.set_value(["id"])
+        row = MagicMock(spec=["id"])
+        row.id = "019462ab-1234-5678-9abc-def012345678"
+        token = encode_cursor(row, sp)
+        decoded = decode_cursor(token)
+        assert decoded == ["019462ab-1234-5678-9abc-def012345678"]
+
+    def test_apply_cursor_filter_wrong_value_count(self):
+        sp = self._make_sort_param_with_resolved_columns(["start_date"])
+        token = _msgpack_cursor_token(["only-one-value"])
+
+        with pytest.raises(HTTPException, match="does not match"):
+            apply_cursor_filter(select(TaskInstance), token, sp)
+
+    def test_apply_cursor_filter_ascending(self):
+        sp = self._make_sort_param_with_resolved_columns(["start_date"])
+        values = [
+            datetime(2024, 1, 15, 10, 0, 0, tzinfo=timezone.utc),
+            uuid.UUID("019462ab-1234-5678-9abc-def012345678"),
+        ]
+        token = _msgpack_cursor_token(values)
+
+        stmt = apply_cursor_filter(select(TaskInstance), token, sp)
+        sql = str(stmt)
+        assert ">" in sql

Review Comment:
   These assertions rely on `str(stmt)` containing raw `>`/`<` operators, which 
is brittle across SQLAlchemy versions/dialects/compilation behaviors (and could 
match unrelated parts of the SQL string). Recommendation (optional): assert 
against the compiled whereclause structure (SQLAlchemy expressions) or compile 
with a deterministic dialect and inspect only the WHERE portion.



##########
airflow-core/src/airflow/api_fastapi/common/cursors.py:
##########
@@ -0,0 +1,143 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Cursor-based (keyset) pagination helpers.
+
+:meta private:
+"""
+
+from __future__ import annotations
+
+import base64
+import uuid as uuid_mod
+from typing import Any
+
+import msgspec
+from fastapi import HTTPException, status
+from sqlalchemy import and_, or_
+from sqlalchemy.sql import Select
+from sqlalchemy.sql.elements import ColumnElement
+from sqlalchemy.sql.sqltypes import Uuid
+
+from airflow.api_fastapi.common.parameters import SortParam
+
+
+def _b64url_decode_padded(token: str) -> bytes:
+    padding = 4 - (len(token) % 4)
+    if padding != 4:
+        token = token + ("=" * padding)
+    return base64.urlsafe_b64decode(token.encode("ascii"))
+
+
+def _nonstrict_bound(col: ColumnElement, value: Any, is_desc: bool) -> 
ColumnElement[bool]:
+    """Inclusive range edge on the leading column at each nesting level 
(``>=`` / ``<=``)."""
+    return col <= value if is_desc else col >= value
+
+
+def _strict_bound(col: ColumnElement, value: Any, is_desc: bool) -> 
ColumnElement[bool]:
+    """Strict inequality for ``or_`` branches (``<`` / ``>``)."""
+    return col < value if is_desc else col > value
+
+
+def _nested_keyset_predicate(
+    resolved: list[tuple[str, ColumnElement, bool]], values: list[Any]
+) -> ColumnElement[bool]:
+    """
+    Keyset predicate for rows strictly after the cursor in ``ORDER BY`` order.
+
+    Uses nested ``and_(non-strict, or_(strict, ...))`` so leading sort keys use
+    inclusive range bounds and inner branches use strict inequalities—friendly
+    for composite index range scans. Logically equivalent to an OR-of-prefix-
+    equalities formulation.
+    """
+    n = len(resolved)
+    _, col, is_desc = resolved[n - 1]
+    inner: ColumnElement[bool] = _strict_bound(col, values[n - 1], is_desc)
+    for i in range(n - 2, -1, -1):
+        _, col_i, is_desc_i = resolved[i]
+        inner = and_(
+            _nonstrict_bound(col_i, values[i], is_desc_i),
+            or_(_strict_bound(col_i, values[i], is_desc_i), inner),
+        )
+    return inner
+
+
+def _coerce_value(column: ColumnElement, value: Any) -> Any:
+    """Normalize decoded values for SQL bind parameters (e.g. UUID columns)."""
+    if value is None or not isinstance(value, str):
+        return value
+    ctype = getattr(column, "type", None)
+    if isinstance(ctype, Uuid):
+        try:
+            return uuid_mod.UUID(value)
+        except ValueError:
+            return value

Review Comment:
   If a cursor contains an invalid UUID string for a `Uuid` column, 
`_coerce_value` silently falls back to the original string. This can produce 
confusing behavior (cursor accepted but results incorrect/empty) or push type 
errors to the DB layer depending on dialect/bind processing. Recommendation 
(mandatory): treat UUID parse failure as an invalid cursor token and raise an 
HTTP 400 (consistent with other cursor validation).
   ```suggestion
               raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor 
token")
   ```



##########
airflow-core/tests/unit/api_fastapi/common/test_cursors.py:
##########
@@ -0,0 +1,144 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import base64
+import uuid
+from datetime import datetime, timezone
+from unittest.mock import MagicMock
+
+import msgspec
+import pytest
+from fastapi import HTTPException
+from sqlalchemy import select
+
+from airflow.api_fastapi.common.cursors import apply_cursor_filter, 
decode_cursor, encode_cursor
+from airflow.api_fastapi.common.parameters import SortParam
+from airflow.models.taskinstance import TaskInstance
+
+
+def _msgpack_cursor_token(payload: object) -> str:
+    """Match production: msgpack + base64url without padding."""
+    return 
base64.urlsafe_b64encode(msgspec.msgpack.encode(payload)).decode("ascii").rstrip("=")
+
+
+class TestCursorPagination:
+    """Tests for cursor-based pagination helpers."""
+
+    def _make_sort_param_with_resolved_columns(self, order_by_values=None):
+        """Build a SortParam for TaskInstance and resolve its columns."""
+        sp = SortParam(["id", "start_date", "map_index"], TaskInstance)
+        sp.set_value(order_by_values or ["map_index"])
+        sp.to_orm(select(TaskInstance))
+        return sp
+
+    def test_encode_decode_cursor_roundtrip(self):
+        sp = self._make_sort_param_with_resolved_columns(["start_date"])
+        row = MagicMock(spec=["start_date", "id"])
+        row.start_date = "2024-01-15T10:00:00+00:00"
+        row.id = "019462ab-1234-5678-9abc-def012345678"
+
+        token = encode_cursor(row, sp)
+        decoded = decode_cursor(token)
+
+        assert decoded == [
+            "2024-01-15T10:00:00+00:00",
+            "019462ab-1234-5678-9abc-def012345678",
+        ]
+
+    def test_decode_cursor_invalid_base64(self):
+        with pytest.raises(HTTPException, match="Invalid cursor token"):
+            decode_cursor("not-valid-base64!!!")
+
+    def test_decode_cursor_invalid_msgpack(self):
+        token = base64.urlsafe_b64encode(b"not-msgpack").decode().rstrip("=")
+        with pytest.raises(HTTPException, match="Invalid cursor token"):
+            decode_cursor(token)
+
+    def test_decode_cursor_not_a_list(self):
+        token = _msgpack_cursor_token({"wrong": "type"})
+        with pytest.raises(HTTPException, match="Invalid cursor token 
structure"):
+            decode_cursor(token)
+
+    def test_encode_cursor_works_without_prior_to_orm(self):
+        """get_resolved_columns now lazily resolves, so to_orm is no longer 
required before encode."""
+        sp = SortParam(["id"], TaskInstance)
+        sp.set_value(["id"])
+        row = MagicMock(spec=["id"])
+        row.id = "019462ab-1234-5678-9abc-def012345678"
+        token = encode_cursor(row, sp)
+        decoded = decode_cursor(token)
+        assert decoded == ["019462ab-1234-5678-9abc-def012345678"]
+
+    def test_apply_cursor_filter_wrong_value_count(self):
+        sp = self._make_sort_param_with_resolved_columns(["start_date"])
+        token = _msgpack_cursor_token(["only-one-value"])
+
+        with pytest.raises(HTTPException, match="does not match"):
+            apply_cursor_filter(select(TaskInstance), token, sp)
+
+    def test_apply_cursor_filter_ascending(self):
+        sp = self._make_sort_param_with_resolved_columns(["start_date"])
+        values = [
+            datetime(2024, 1, 15, 10, 0, 0, tzinfo=timezone.utc),
+            uuid.UUID("019462ab-1234-5678-9abc-def012345678"),
+        ]
+        token = _msgpack_cursor_token(values)
+
+        stmt = apply_cursor_filter(select(TaskInstance), token, sp)
+        sql = str(stmt)
+        assert ">" in sql
+
+    def test_apply_cursor_filter_descending(self):
+        sp = self._make_sort_param_with_resolved_columns(["-start_date"])
+        values = [
+            datetime(2024, 1, 15, 10, 0, 0, tzinfo=timezone.utc),
+            uuid.UUID("019462ab-1234-5678-9abc-def012345678"),
+        ]
+        token = _msgpack_cursor_token(values)
+
+        stmt = apply_cursor_filter(select(TaskInstance), token, sp)
+        sql = str(stmt)
+        assert "<" in sql

Review Comment:
   These assertions rely on `str(stmt)` containing raw `>`/`<` operators, which 
is brittle across SQLAlchemy versions/dialects/compilation behaviors (and could 
match unrelated parts of the SQL string). Recommendation (optional): assert 
against the compiled whereclause structure (SQLAlchemy expressions) or compile 
with a deterministic dialect and inspect only the WHERE portion.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -470,13 +472,26 @@ def get_task_instances(
     ],
     readable_ti_filter: ReadableTIFilterDep,
     session: SessionDep,
+    cursor: str | None = Query(
+        None,
+        description="Cursor for keyset-based pagination (mutually exclusive 
with offset). "
+        "Pass an empty string for the first page, then use ``next_cursor`` 
from the response.",

Review Comment:
   The parameter description says cursor pagination is \"mutually exclusive 
with offset\", but the implementation silently ignores `offset` when `cursor` 
is provided. Recommendation (mandatory): either enforce exclusivity (e.g., 
return 400/422 when both `cursor` and a nonzero `offset` are provided), or 
update the description/OpenAPI docs to match the actual behavior (\"offset is 
ignored when cursor is set\").
   ```suggestion
           description="Cursor for keyset-based pagination. "
           "Pass an empty string for the first page, then use ``next_cursor`` 
from the response. "
           "When `cursor` is provided, `offset` is ignored.",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to