Copilot commented on code in PR #64845:
URL: https://github.com/apache/airflow/pull/64845#discussion_r3066478533
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -498,40 +513,53 @@ def get_task_instances(
if dag:
task_group_id.dag = dag
+ filters: list[OrmClause] = [
+ run_after_range,
+ logical_date_range,
+ start_date_range,
+ end_date_range,
+ update_at_range,
+ duration_range,
+ state,
+ pool,
+ pool_name_pattern,
+ queue,
+ queue_name_pattern,
+ executor,
+ task_id,
+ task_display_name_pattern,
+ task_group_id,
+ dag_id_pattern,
+ run_id_pattern,
+ version_number,
+ readable_ti_filter,
+ try_number,
+ operator,
+ operator_name_pattern,
+ map_index,
+ ]
+
+ if use_cursor:
+ task_instance_select = apply_filters_to_select(statement=query,
filters=[*filters, order_by, limit])
+ if cursor:
+ task_instance_select = apply_cursor_filter(task_instance_select,
cursor, order_by)
+
+ task_instances = list(session.scalars(task_instance_select))
+ return TaskInstanceCollectionResponse(
+ task_instances=task_instances,
+ next_cursor=encode_cursor(task_instances[-1], order_by) if
task_instances else None,
+ previous_cursor=encode_cursor(task_instances[0], order_by) if
task_instances else None,
Review Comment:
`next_cursor`/`previous_cursor` are computed as cursors for the last/first
item of the *current* page, but the API docs claim they point to the
next/previous page and should become `null` at the end/start. As implemented,
`next_cursor` will be non-null even on the last page (because the code doesn't
fetch `limit + 1` to detect more results), and `previous_cursor` will also be
non-null on the first page. Recommendation (mandatory): fetch one extra row
(`limit + 1`) to decide whether to return `next_cursor`, and either (a)
remove/rename `previous_cursor` to something like `current_page_start_cursor`,
or (b) implement true backwards pagination semantics (e.g., separate
`before`/`after` cursors or a direction flag) so `previous_cursor` actually
retrieves the previous page.
```suggestion
task_instance_select = apply_filters_to_select(statement=query,
filters=[*filters, order_by]).limit(
limit + 1
)
if cursor:
task_instance_select = apply_cursor_filter(task_instance_select,
cursor, order_by)
fetched_task_instances = list(session.scalars(task_instance_select))
has_next_page = len(fetched_task_instances) > limit
task_instances = fetched_task_instances[:limit]
return TaskInstanceCollectionResponse(
task_instances=task_instances,
next_cursor=encode_cursor(task_instances[-1], order_by) if
has_next_page and task_instances else None,
previous_cursor=None,
```
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -470,13 +472,26 @@ def get_task_instances(
],
readable_ti_filter: ReadableTIFilterDep,
session: SessionDep,
+ cursor: str | None = Query(
+ None,
+ description="Cursor for keyset-based pagination (mutually exclusive
with offset). "
+ "Pass an empty string for the first page, then use ``next_cursor``
from the response.",
+ ),
) -> TaskInstanceCollectionResponse:
"""
Get list of task instances.
- This endpoint allows specifying `~` as the dag_id, dag_run_id to retrieve
Task Instances for all DAGs
- and DAG runs.
+ This endpoint allows specifying `~` as the dag_id, dag_run_id
+ to retrieve task instances for all DAGs and DAG runs.
+
+ Supports two pagination modes:
+
+ **Offset (default):** use `limit` and `offset` query parameters. Returns
`total_entries`.
+
+ **Cursor:** pass `cursor` (empty string for the first page, then
`next_cursor` from the response).
+ When `cursor` is provided, `offset` is ignored and `total_entries` is not
returned.
"""
+ use_cursor = cursor is not None
Review Comment:
The parameter description says cursor pagination is \"mutually exclusive
with offset\", but the implementation silently ignores `offset` when `cursor`
is provided. Recommendation (mandatory): either enforce exclusivity (e.g.,
return 400/422 when both `cursor` and a nonzero `offset` are provided), or
update the description/OpenAPI docs to match the actual behavior (\"offset is
ignored when cursor is set\").
##########
airflow-core/tests/unit/api_fastapi/common/test_cursors.py:
##########
@@ -0,0 +1,144 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import base64
+import uuid
+from datetime import datetime, timezone
+from unittest.mock import MagicMock
+
+import msgspec
+import pytest
+from fastapi import HTTPException
+from sqlalchemy import select
+
+from airflow.api_fastapi.common.cursors import apply_cursor_filter,
decode_cursor, encode_cursor
+from airflow.api_fastapi.common.parameters import SortParam
+from airflow.models.taskinstance import TaskInstance
+
+
+def _msgpack_cursor_token(payload: object) -> str:
+ """Match production: msgpack + base64url without padding."""
+ return
base64.urlsafe_b64encode(msgspec.msgpack.encode(payload)).decode("ascii").rstrip("=")
+
+
+class TestCursorPagination:
+ """Tests for cursor-based pagination helpers."""
+
+ def _make_sort_param_with_resolved_columns(self, order_by_values=None):
+ """Build a SortParam for TaskInstance and resolve its columns."""
+ sp = SortParam(["id", "start_date", "map_index"], TaskInstance)
+ sp.set_value(order_by_values or ["map_index"])
+ sp.to_orm(select(TaskInstance))
+ return sp
+
+ def test_encode_decode_cursor_roundtrip(self):
+ sp = self._make_sort_param_with_resolved_columns(["start_date"])
+ row = MagicMock(spec=["start_date", "id"])
+ row.start_date = "2024-01-15T10:00:00+00:00"
+ row.id = "019462ab-1234-5678-9abc-def012345678"
+
+ token = encode_cursor(row, sp)
+ decoded = decode_cursor(token)
+
+ assert decoded == [
+ "2024-01-15T10:00:00+00:00",
+ "019462ab-1234-5678-9abc-def012345678",
+ ]
+
+ def test_decode_cursor_invalid_base64(self):
+ with pytest.raises(HTTPException, match="Invalid cursor token"):
+ decode_cursor("not-valid-base64!!!")
+
+ def test_decode_cursor_invalid_msgpack(self):
+ token = base64.urlsafe_b64encode(b"not-msgpack").decode().rstrip("=")
+ with pytest.raises(HTTPException, match="Invalid cursor token"):
+ decode_cursor(token)
+
+ def test_decode_cursor_not_a_list(self):
+ token = _msgpack_cursor_token({"wrong": "type"})
+ with pytest.raises(HTTPException, match="Invalid cursor token
structure"):
+ decode_cursor(token)
+
+ def test_encode_cursor_works_without_prior_to_orm(self):
+ """get_resolved_columns now lazily resolves, so to_orm is no longer
required before encode."""
+ sp = SortParam(["id"], TaskInstance)
+ sp.set_value(["id"])
+ row = MagicMock(spec=["id"])
+ row.id = "019462ab-1234-5678-9abc-def012345678"
+ token = encode_cursor(row, sp)
+ decoded = decode_cursor(token)
+ assert decoded == ["019462ab-1234-5678-9abc-def012345678"]
+
+ def test_apply_cursor_filter_wrong_value_count(self):
+ sp = self._make_sort_param_with_resolved_columns(["start_date"])
+ token = _msgpack_cursor_token(["only-one-value"])
+
+ with pytest.raises(HTTPException, match="does not match"):
+ apply_cursor_filter(select(TaskInstance), token, sp)
+
+ def test_apply_cursor_filter_ascending(self):
+ sp = self._make_sort_param_with_resolved_columns(["start_date"])
+ values = [
+ datetime(2024, 1, 15, 10, 0, 0, tzinfo=timezone.utc),
+ uuid.UUID("019462ab-1234-5678-9abc-def012345678"),
+ ]
+ token = _msgpack_cursor_token(values)
+
+ stmt = apply_cursor_filter(select(TaskInstance), token, sp)
+ sql = str(stmt)
+ assert ">" in sql
Review Comment:
These assertions rely on `str(stmt)` containing raw `>`/`<` operators, which
is brittle across SQLAlchemy versions/dialects/compilation behaviors (and could
match unrelated parts of the SQL string). Recommendation (optional): assert
against the compiled whereclause structure (SQLAlchemy expressions) or compile
with a deterministic dialect and inspect only the WHERE portion.
##########
airflow-core/src/airflow/api_fastapi/common/cursors.py:
##########
@@ -0,0 +1,143 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Cursor-based (keyset) pagination helpers.
+
+:meta private:
+"""
+
+from __future__ import annotations
+
+import base64
+import uuid as uuid_mod
+from typing import Any
+
+import msgspec
+from fastapi import HTTPException, status
+from sqlalchemy import and_, or_
+from sqlalchemy.sql import Select
+from sqlalchemy.sql.elements import ColumnElement
+from sqlalchemy.sql.sqltypes import Uuid
+
+from airflow.api_fastapi.common.parameters import SortParam
+
+
+def _b64url_decode_padded(token: str) -> bytes:
+ padding = 4 - (len(token) % 4)
+ if padding != 4:
+ token = token + ("=" * padding)
+ return base64.urlsafe_b64decode(token.encode("ascii"))
+
+
+def _nonstrict_bound(col: ColumnElement, value: Any, is_desc: bool) ->
ColumnElement[bool]:
+ """Inclusive range edge on the leading column at each nesting level
(``>=`` / ``<=``)."""
+ return col <= value if is_desc else col >= value
+
+
+def _strict_bound(col: ColumnElement, value: Any, is_desc: bool) ->
ColumnElement[bool]:
+ """Strict inequality for ``or_`` branches (``<`` / ``>``)."""
+ return col < value if is_desc else col > value
+
+
+def _nested_keyset_predicate(
+ resolved: list[tuple[str, ColumnElement, bool]], values: list[Any]
+) -> ColumnElement[bool]:
+ """
+ Keyset predicate for rows strictly after the cursor in ``ORDER BY`` order.
+
+ Uses nested ``and_(non-strict, or_(strict, ...))`` so leading sort keys use
+ inclusive range bounds and inner branches use strict inequalities—friendly
+ for composite index range scans. Logically equivalent to an OR-of-prefix-
+ equalities formulation.
+ """
+ n = len(resolved)
+ _, col, is_desc = resolved[n - 1]
+ inner: ColumnElement[bool] = _strict_bound(col, values[n - 1], is_desc)
+ for i in range(n - 2, -1, -1):
+ _, col_i, is_desc_i = resolved[i]
+ inner = and_(
+ _nonstrict_bound(col_i, values[i], is_desc_i),
+ or_(_strict_bound(col_i, values[i], is_desc_i), inner),
+ )
+ return inner
+
+
+def _coerce_value(column: ColumnElement, value: Any) -> Any:
+ """Normalize decoded values for SQL bind parameters (e.g. UUID columns)."""
+ if value is None or not isinstance(value, str):
+ return value
+ ctype = getattr(column, "type", None)
+ if isinstance(ctype, Uuid):
+ try:
+ return uuid_mod.UUID(value)
+ except ValueError:
+ return value
Review Comment:
If a cursor contains an invalid UUID string for a `Uuid` column,
`_coerce_value` silently falls back to the original string. This can produce
confusing behavior (cursor accepted but results incorrect/empty) or push type
errors to the DB layer depending on dialect/bind processing. Recommendation
(mandatory): treat UUID parse failure as an invalid cursor token and raise an
HTTP 400 (consistent with other cursor validation).
```suggestion
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor
token")
```
##########
airflow-core/tests/unit/api_fastapi/common/test_cursors.py:
##########
@@ -0,0 +1,144 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import base64
+import uuid
+from datetime import datetime, timezone
+from unittest.mock import MagicMock
+
+import msgspec
+import pytest
+from fastapi import HTTPException
+from sqlalchemy import select
+
+from airflow.api_fastapi.common.cursors import apply_cursor_filter,
decode_cursor, encode_cursor
+from airflow.api_fastapi.common.parameters import SortParam
+from airflow.models.taskinstance import TaskInstance
+
+
+def _msgpack_cursor_token(payload: object) -> str:
+ """Match production: msgpack + base64url without padding."""
+ return
base64.urlsafe_b64encode(msgspec.msgpack.encode(payload)).decode("ascii").rstrip("=")
+
+
+class TestCursorPagination:
+ """Tests for cursor-based pagination helpers."""
+
+ def _make_sort_param_with_resolved_columns(self, order_by_values=None):
+ """Build a SortParam for TaskInstance and resolve its columns."""
+ sp = SortParam(["id", "start_date", "map_index"], TaskInstance)
+ sp.set_value(order_by_values or ["map_index"])
+ sp.to_orm(select(TaskInstance))
+ return sp
+
+ def test_encode_decode_cursor_roundtrip(self):
+ sp = self._make_sort_param_with_resolved_columns(["start_date"])
+ row = MagicMock(spec=["start_date", "id"])
+ row.start_date = "2024-01-15T10:00:00+00:00"
+ row.id = "019462ab-1234-5678-9abc-def012345678"
+
+ token = encode_cursor(row, sp)
+ decoded = decode_cursor(token)
+
+ assert decoded == [
+ "2024-01-15T10:00:00+00:00",
+ "019462ab-1234-5678-9abc-def012345678",
+ ]
+
+ def test_decode_cursor_invalid_base64(self):
+ with pytest.raises(HTTPException, match="Invalid cursor token"):
+ decode_cursor("not-valid-base64!!!")
+
+ def test_decode_cursor_invalid_msgpack(self):
+ token = base64.urlsafe_b64encode(b"not-msgpack").decode().rstrip("=")
+ with pytest.raises(HTTPException, match="Invalid cursor token"):
+ decode_cursor(token)
+
+ def test_decode_cursor_not_a_list(self):
+ token = _msgpack_cursor_token({"wrong": "type"})
+ with pytest.raises(HTTPException, match="Invalid cursor token
structure"):
+ decode_cursor(token)
+
+ def test_encode_cursor_works_without_prior_to_orm(self):
+ """get_resolved_columns now lazily resolves, so to_orm is no longer
required before encode."""
+ sp = SortParam(["id"], TaskInstance)
+ sp.set_value(["id"])
+ row = MagicMock(spec=["id"])
+ row.id = "019462ab-1234-5678-9abc-def012345678"
+ token = encode_cursor(row, sp)
+ decoded = decode_cursor(token)
+ assert decoded == ["019462ab-1234-5678-9abc-def012345678"]
+
+ def test_apply_cursor_filter_wrong_value_count(self):
+ sp = self._make_sort_param_with_resolved_columns(["start_date"])
+ token = _msgpack_cursor_token(["only-one-value"])
+
+ with pytest.raises(HTTPException, match="does not match"):
+ apply_cursor_filter(select(TaskInstance), token, sp)
+
+ def test_apply_cursor_filter_ascending(self):
+ sp = self._make_sort_param_with_resolved_columns(["start_date"])
+ values = [
+ datetime(2024, 1, 15, 10, 0, 0, tzinfo=timezone.utc),
+ uuid.UUID("019462ab-1234-5678-9abc-def012345678"),
+ ]
+ token = _msgpack_cursor_token(values)
+
+ stmt = apply_cursor_filter(select(TaskInstance), token, sp)
+ sql = str(stmt)
+ assert ">" in sql
+
+ def test_apply_cursor_filter_descending(self):
+ sp = self._make_sort_param_with_resolved_columns(["-start_date"])
+ values = [
+ datetime(2024, 1, 15, 10, 0, 0, tzinfo=timezone.utc),
+ uuid.UUID("019462ab-1234-5678-9abc-def012345678"),
+ ]
+ token = _msgpack_cursor_token(values)
+
+ stmt = apply_cursor_filter(select(TaskInstance), token, sp)
+ sql = str(stmt)
+ assert "<" in sql
Review Comment:
These assertions rely on `str(stmt)` containing raw `>`/`<` operators, which
is brittle across SQLAlchemy versions/dialects/compilation behaviors (and could
match unrelated parts of the SQL string). Recommendation (optional): assert
against the compiled whereclause structure (SQLAlchemy expressions) or compile
with a deterministic dialect and inspect only the WHERE portion.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -470,13 +472,26 @@ def get_task_instances(
],
readable_ti_filter: ReadableTIFilterDep,
session: SessionDep,
+ cursor: str | None = Query(
+ None,
+ description="Cursor for keyset-based pagination (mutually exclusive
with offset). "
+ "Pass an empty string for the first page, then use ``next_cursor``
from the response.",
Review Comment:
The parameter description says cursor pagination is \"mutually exclusive
with offset\", but the implementation silently ignores `offset` when `cursor`
is provided. Recommendation (mandatory): either enforce exclusivity (e.g.,
return 400/422 when both `cursor` and a nonzero `offset` are provided), or
update the description/OpenAPI docs to match the actual behavior (\"offset is
ignored when cursor is set\").
```suggestion
description="Cursor for keyset-based pagination. "
"Pass an empty string for the first page, then use ``next_cursor``
from the response. "
"When `cursor` is provided, `offset` is ignored.",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]