pierrejeambrun commented on code in PR #64845:
URL: https://github.com/apache/airflow/pull/64845#discussion_r3058738597


##########
airflow-core/src/airflow/api_fastapi/common/cursors.py:
##########
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Cursor-based (keyset) pagination helpers.
+
+:meta private:
+"""
+
+from __future__ import annotations
+
+import base64
+import json
+import uuid as uuid_mod
+from datetime import datetime
+from typing import TYPE_CHECKING, Any
+
+from fastapi import HTTPException, status
+from sqlalchemy import and_, or_
+
+if TYPE_CHECKING:
+    from sqlalchemy.sql import Select
+
+    from airflow.api_fastapi.common.parameters import SortParam
+
+
+def _encode_value(val: Any) -> dict[str, Any]:
+    """Encode a single Python value as a typed {"type": ..., "value": ...} 
object."""
+    if val is None:
+        return {"type": "null", "value": None}
+    if isinstance(val, uuid_mod.UUID):
+        return {"type": "uuid", "value": str(val)}
+    if isinstance(val, datetime):
+        return {"type": "datetime", "value": val.isoformat()}
+    if isinstance(val, int):
+        return {"type": "int", "value": val}
+    return {"type": "str", "value": str(val)}
+
+
+def _decode_value(entry: dict[str, Any]) -> Any:
+    """Decode a typed cursor entry back to its Python value."""
+    type_tag = entry["type"]
+    raw = entry["value"]
+    if type_tag == "null":
+        return None
+    if type_tag == "uuid":
+        return uuid_mod.UUID(str(raw))
+    if type_tag == "datetime":
+        return datetime.fromisoformat(str(raw))
+    if type_tag == "int":
+        return int(raw)
+    return str(raw)
+
+
+def encode_cursor(row: Any, sort_param: SortParam) -> str:
+    """
+    Encode cursor token from the last row of a result set.
+
+    The token is a base64url-encoded JSON list of typed objects, each
+    containing ``{"type": "<tag>", "value": <serialized>}`` so the
+    cursor is self-describing and can be decoded without column metadata.
+    """
+    resolved = sort_param.get_resolved_columns()
+    if not resolved:
+        raise ValueError("SortParam has no resolved columns.")
+
+    entries = [_encode_value(getattr(row, attr_name, None)) for attr_name, 
_col, _desc in resolved]
+    return base64.urlsafe_b64encode(json.dumps(entries).encode()).decode()
+
+
+def decode_cursor(token: str) -> list[dict[str, Any]]:
+    """Decode a cursor token and return the list of typed value entries."""
+    try:
+        data = json.loads(base64.urlsafe_b64decode(token))
+    except Exception:
+        raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor 
token")
+
+    if not isinstance(data, list) or any(
+        not isinstance(entry, dict) or "type" not in entry or "value" not in 
entry for entry in data
+    ):
+        raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor token 
structure")
+
+    return data
+
+
+def apply_cursor_filter(statement: Select, cursor: str, sort_param: SortParam) 
-> Select:
+    """
+    Apply a keyset pagination WHERE clause from a cursor token.
+
+    Builds a composite comparison that respects mixed ASC/DESC ordering
+    on the resolved sort columns.
+    """
+    cursor_entries = decode_cursor(cursor)
+
+    resolved = sort_param.get_resolved_columns()
+    if len(cursor_entries) != len(resolved):
+        raise HTTPException(status.HTTP_400_BAD_REQUEST, "Cursor token does 
not match current query shape")
+
+    parsed_values = [_decode_value(entry) for entry in cursor_entries]
+
+    # Build the keyset WHERE clause for mixed ASC/DESC ordering.
+    # For columns (c1 ASC, c2 DESC, c3 ASC) with cursor values (v1, v2, v3):
+    #   (c1 > v1) OR
+    #   (c1 = v1 AND c2 < v2) OR
+    #   (c1 = v1 AND c2 = v2 AND c3 > v3)
+    or_clauses = []
+    for i, (_, col, is_desc) in enumerate(resolved):
+        eq_conditions = [resolved[j][1] == parsed_values[j] for j in range(i)]
+        if is_desc:
+            bound = col < parsed_values[i]
+        else:
+            bound = col > parsed_values[i]
+        or_clauses.append(and_(*eq_conditions, bound))

Review Comment:
   I think both way to construct the clause are correct. But since yours is 
battle tested, I've switched to a similar approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to