This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8faf4c6b6d [python] Honor sequence.field on the primary-key read path 
(#8075)
8faf4c6b6d is described below

commit 8faf4c6b6d2c4f7aa2e0973b600d5ccc8944ef9f
Author: Junrui Lee <[email protected]>
AuthorDate: Tue Jun 2 21:47:22 2026 +0800

    [python] Honor sequence.field on the primary-key read path (#8075)
---
 .../pypaimon/common/options/core_options.py        |  54 +-
 .../pypaimon/read/merge_engine_support.py          | 141 +++++-
 .../read/reader/aggregation_merge_function.py      |  37 +-
 .../pypaimon/read/reader/sort_merge_reader.py      | 177 +++++--
 paimon-python/pypaimon/read/split_read.py          |  15 +-
 paimon-python/pypaimon/read/table_read.py          |  26 +
 .../pypaimon/tests/test_aggregation_e2e.py         |  41 +-
 .../pypaimon/tests/test_sequence_field_read.py     | 544 +++++++++++++++++++++
 8 files changed, 970 insertions(+), 65 deletions(-)

diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index bb157bb6e3..5a51878294 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -18,7 +18,7 @@
 import sys
 from datetime import timedelta
 from enum import Enum
-from typing import Dict, Optional
+from typing import Dict, List, Optional
 
 from pypaimon.common.memory_size import MemorySize
 from pypaimon.common.options import Options
@@ -56,6 +56,15 @@ class MergeEngine(str, Enum):
     FIRST_ROW = "first-row"
 
 
+class SortOrder(str, Enum):
+    """
+    Specifies the order of ``sequence.field``. Mirrors Java
+    ``CoreOptions.SortOrder``.
+    """
+    ASCENDING = "ascending"
+    DESCENDING = "descending"
+
+
 class GlobalIndexColumnUpdateAction(str, Enum):
     THROW_ERROR = "THROW_ERROR"
     DROP_PARTITION_INDEX = "DROP_PARTITION_INDEX"
@@ -411,6 +420,22 @@ class CoreOptions:
         .with_description("Whether to ignore delete records.")
     )
 
+    SEQUENCE_FIELD: ConfigOption[str] = (
+        ConfigOptions.key("sequence.field")
+        .string_type()
+        .no_default_value()
+        .with_description("The field that generates the sequence number for "
+                          "primary key table, the sequence number determines "
+                          "which data is the most recent.")
+    )
+
+    SEQUENCE_FIELD_SORT_ORDER: ConfigOption[SortOrder] = (
+        ConfigOptions.key("sequence.field.sort-order")
+        .enum_type(SortOrder)
+        .default_value(SortOrder.ASCENDING)
+        .with_description("Specify the order of sequence.field.")
+    )
+
     # Commit options
     COMMIT_USER_PREFIX: ConfigOption[str] = (
         ConfigOptions.key("commit.user-prefix")
@@ -808,6 +833,33 @@ class CoreOptions:
     def merge_engine(self, default=None):
         return self.options.get(CoreOptions.MERGE_ENGINE, default)
 
+    def sequence_field(self) -> List[str]:
+        """User-defined sequence fields, in declaration order. Empty list
+        when ``sequence.field`` is unset. Mirrors Java
+        ``CoreOptions.sequenceField()``.
+        """
+        raw = self.options.get(CoreOptions.SEQUENCE_FIELD)
+        if not raw:
+            return []
+        # Mirror Java ``CoreOptions.sequenceField()``
+        # (``Arrays.stream(s.split(',')).map(String::trim)``): Java's
+        # ``String.split(",")`` drops *trailing* empty segments (so ``'ts,'``
+        # yields ``['ts']``) but keeps interior ones, and each segment is
+        # then trimmed. So an interior empty segment (``'ts,,ts2'``) survives
+        # as an empty field name that ``check_sequence_field_valid`` rejects,
+        # while a trailing comma is tolerated.
+        segments = raw.split(",")
+        while segments and segments[-1] == "":
+            segments.pop()
+        return [name.strip() for name in segments]
+
+    def sequence_field_sort_order_is_ascending(self) -> bool:
+        """Whether ``sequence.field.sort-order`` is ascending (the default).
+        Mirrors Java ``CoreOptions.sequenceFieldSortOrderIsAscending()``.
+        """
+        return (self.options.get(CoreOptions.SEQUENCE_FIELD_SORT_ORDER)
+                == SortOrder.ASCENDING)
+
     def ignore_delete(self) -> bool:
         raw = self.options.to_map()
         fallback_keys = (
diff --git a/paimon-python/pypaimon/read/merge_engine_support.py 
b/paimon-python/pypaimon/read/merge_engine_support.py
index 8ab4813f7e..cb5d34ab87 100644
--- a/paimon-python/pypaimon/read/merge_engine_support.py
+++ b/paimon-python/pypaimon/read/merge_engine_support.py
@@ -63,22 +63,147 @@ _AGGREGATION_SUPPORTED_AGG_FUNCS = frozenset([
     "sum", "max", "min",
     "bool_or", "bool_and",
 ])
-_SEQUENCE_FIELD_KEY = "sequence.field"
 _FIELDS_PREFIX = "fields."
 _FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group"
 _FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function"
 _FIELD_IGNORE_RETRACT_SUFFIX = ".ignore-retract"
+_FIELD_NESTED_SEQUENCE_SUFFIX = ".nested-sequence-field"
 _DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function"
 
 
+def _nested_sequence_field_options(table) -> Set[str]:
+    """Option keys configuring ``nested-sequence-field`` (a per-field
+    nested sequence ordering distinct from the top-level
+    ``sequence.field``). pypaimon implements top-level ``sequence.field``
+    but not nested sequence fields, so reject them on every PK engine
+    rather than silently ignoring them.
+    """
+    flagged: Set[str] = set()
+    raw = table.options.options.to_map()
+    for key in raw:
+        if key.startswith(_FIELDS_PREFIX) and key.endswith(
+                _FIELD_NESTED_SEQUENCE_SUFFIX):
+            flagged.add(key)
+    return flagged
+
+
+def _unsupported_sequence_fields(table) -> Set[str]:
+    """Configured ``sequence.field`` names whose type pypaimon cannot order.
+    Java's ``UserDefinedSeqComparator`` delegates to ``RecordComparator``
+    and supports ARRAY / VECTOR / MAP / MULTISET / ROW, but pypaimon's
+    ``builtin_seq_comparator`` only compares orderable atomic types. This
+    flags both complex (non-atomic) types and the atomic-but-unorderable
+    VARIANT, so a raw-convertible split (which skips the merge reader) can't
+    silently bypass the limitation.
+    """
+    from pypaimon.read.reader.sort_merge_reader import is_comparable_seq_field
+    flagged: Set[str] = set()
+    for field in table.options.sequence_field():
+        data_field = table.field_dict.get(field)
+        if data_field is not None and not is_comparable_seq_field(data_field):
+            flagged.add(field)
+    return flagged
+
+
+def check_sequence_field_valid(table) -> None:
+    """Reject ``sequence.field`` configurations Java forbids at schema
+    validation (``SchemaValidation.validateSequenceField``), raising
+    ``ValueError`` to mirror Java's ``IllegalArgumentException``.
+
+    These are invalid configurations, not deferred features, so they are
+    rejected on every merge engine regardless of pypaimon's read-path
+    coverage. Mirrors all of Java's checks:
+
+    1. Every sequence field must exist in the table schema.
+    2. No sequence field may be declared more than once.
+    3. ``fields.<seq>.aggregate-function`` on a sequence column: Java
+       forbids aggregating the sequence column outright. pypaimon's
+       aggregation engine otherwise silently overrides it with
+       ``last_value``, hiding the misconfiguration.
+    4. ``sequence.field`` together with ``merge-engine=first-row``:
+       first-row keeps the earliest-written row and never honors a
+       sequence ordering.
+    5. ``sequence.field`` together with cross-partition update (the PK
+       does not include all partition fields).
+    """
+    sequence_fields = table.options.sequence_field()
+    if not sequence_fields:
+        return
+
+    field_names = set(table.field_names)
+    seen: Set[str] = set()
+    options_map = table.options.options.to_map()
+    for field in sequence_fields:
+        if field not in field_names:
+            raise ValueError(
+                "Sequence field: '{}' can not be found in table "
+                "schema.".format(field)
+            )
+        if field in seen:
+            raise ValueError(
+                "Sequence field '{}' is defined repeatedly.".format(field)
+            )
+        seen.add(field)
+        agg_key = "fields.{}.aggregate-function".format(field)
+        if options_map.get(agg_key) is not None:
+            raise ValueError(
+                "Should not define aggregation on sequence field: '{}' "
+                "({}).".format(field, agg_key)
+            )
+
+    if table.options.merge_engine() == MergeEngine.FIRST_ROW:
+        raise ValueError(
+            "Do not support use sequence.field on FIRST_ROW merge engine."
+        )
+
+    if table.cross_partition_update:
+        raise ValueError(
+            "You can not use sequence.field in cross partition update case "
+            "(primary keys {} do not include all partition fields "
+            "{}).".format(table.primary_keys, table.partition_keys)
+        )
+
+
 def check_supported(table) -> None:
     """Raise ``NotImplementedError`` if the table's merge-engine
-    configuration is outside what pypaimon's read path implements.
+    configuration is outside what pypaimon's read path implements, or
+    ``ValueError`` if it is an outright-invalid configuration that Java
+    rejects at schema validation.
 
     Non-PK tables are always fine (no merge function involved).
     """
     if not table.is_primary_key_table:
         return
+    # ``nested-sequence-field`` is unimplemented on every engine; reject it
+    # before per-engine dispatch so it can't be silently ignored by the
+    # top-level ``sequence.field`` comparator.
+    nested_seq = _nested_sequence_field_options(table)
+    if nested_seq:
+        raise NotImplementedError(
+            "nested-sequence-field is not implemented in pypaimon yet: {}. "
+            "Top-level 'sequence.field' is supported; open an issue to track "
+            "nested sequence field support.".format(", 
".join(sorted(nested_seq)))
+        )
+    # ``sequence.field`` validity is engine-independent in Java
+    # (SchemaValidation.validateSequenceField). pypaimon has no
+    # schema-creation validation, so enforce the same invariants here on
+    # the read path, before per-engine dispatch.
+    check_sequence_field_valid(table)
+    # ``sequence.field`` validity (above) is Java-aligned and engine
+    # independent. Some field *types* are valid in Java but unimplemented in
+    # pypaimon's orderable-atomic-only comparator (complex types, plus the
+    # atomic-but-unorderable VARIANT), so reject them as NotImplementedError
+    # here -- before per-engine dispatch, so a raw-convertible split can't
+    # bypass the merge reader and skip the check.
+    unsupported_seq = _unsupported_sequence_fields(table)
+    if unsupported_seq:
+        raise NotImplementedError(
+            "sequence.field with unsupported type is not implemented in "
+            "pypaimon yet: {}. pypaimon only supports orderable atomic "
+            "sequence-field types; complex types (ARRAY / MAP / ROW etc., "
+            "handled by Java via RecordComparator) and VARIANT are not "
+            "supported. Open an issue to track support.".format(
+                ", ".join(sorted(unsupported_seq))))
     engine = table.options.merge_engine()
     if engine == MergeEngine.DEDUPLICATE:
         return
@@ -108,7 +233,7 @@ def check_supported(table) -> None:
                 "supported subset is per-key field aggregation with the "
                 "built-in aggregators ({}); retract opt-ins "
                 "(aggregation.remove-record-on-delete, "
-                "fields.<f>.ignore-retract), sequence-field handling "
+                "fields.<f>.ignore-retract) "
                 "and other aggregators (product / listagg / collect / "
                 "merge_map* / nested_update* / theta_sketch / "
                 "hll_sketch / roaring_bitmap_*) are not yet supported. "
@@ -156,11 +281,9 @@ def aggregation_unsupported_options(table) -> Set[str]:
        ``fields.<f>.ignore-retract`` only make sense in conjunction
        with DELETE / UPDATE_BEFORE handling, which the engine does not
        implement.
-    2. Sequence-field configuration: ``sequence.field`` /
-       ``fields.<f>.sequence-group`` are not supported; the merge
-       function does not special-case sequence fields, so we refuse
-       the table rather than silently merge them as ordinary value
-       columns.
+    2. Sequence-group configuration: ``fields.<f>.sequence-group`` is not
+       supported (top-level ``sequence.field`` is honored, see
+       ``builtin_seq_comparator``).
     3. Out-of-scope aggregator selections: ``fields.<f>.aggregate-
        function`` and ``fields.default-aggregate-function`` set to an
        identifier this engine doesn't support yet (e.g. ``collect``,
@@ -172,8 +295,6 @@ def aggregation_unsupported_options(table) -> Set[str]:
         if (key in _AGGREGATION_UNSUPPORTED_BOOLEAN_OPTIONS
                 and _option_is_truthy(value)):
             flagged.add(key)
-        elif key == _SEQUENCE_FIELD_KEY and value:
-            flagged.add(key)
         elif key == _DEFAULT_AGGREGATE_FUNCTION_KEY:
             if value not in _AGGREGATION_SUPPORTED_AGG_FUNCS:
                 flagged.add(key)
diff --git a/paimon-python/pypaimon/read/reader/aggregation_merge_function.py 
b/paimon-python/pypaimon/read/reader/aggregation_merge_function.py
index 5e2c149738..ae66673403 100644
--- a/paimon-python/pypaimon/read/reader/aggregation_merge_function.py
+++ b/paimon-python/pypaimon/read/reader/aggregation_merge_function.py
@@ -42,6 +42,7 @@ from typing import Any, List, Optional
 from pypaimon.read.reader.aggregate import create_field_aggregator
 from pypaimon.read.reader.aggregate.aggregators import (
     NAME_LAST_NON_NULL_VALUE,
+    NAME_LAST_VALUE,
     NAME_PRIMARY_KEY,
 )
 from pypaimon.read.reader.aggregate.field_aggregator import FieldAggregator
@@ -56,22 +57,28 @@ from pypaimon.table.row.row_kind import RowKind
 # ---------------------------------------------------------------------------
 
 
-def resolve_agg_func_name(field_name, primary_keys, options_map):
-    """Pick the aggregator identifier for ``field_name`` using the
-    following precedence:
+def resolve_agg_func_name(field_name, primary_keys, options_map,
+                          sequence_fields=()):
+    """Pick the aggregator identifier for ``field_name`` using the same
+    precedence as Java ``AggregateMergeFunction.getAggFuncName``:
 
-    1. Primary-key columns use ``primary_key`` (identity).
-    2. Otherwise, field-level ``fields.<f>.aggregate-function``
+    1. Sequence fields use ``last_value`` (no aggregation -- the
+       sequence column just carries the latest-by-sequence value).
+    2. Primary-key columns use ``primary_key`` (identity).
+    3. Otherwise, field-level ``fields.<f>.aggregate-function``
        overrides everything.
-    3. Otherwise, the table-wide ``fields.default-aggregate-function``.
-    4. Otherwise, the system default ``last_non_null_value``.
-
-    Sequence fields are intentionally **not** special-cased here: the
-    merge-engine guard in :mod:`pypaimon.read.merge_engine_support`
-    rejects any table that sets ``sequence.field`` on the
-    ``aggregation`` engine, so by the time this function runs there is
-    no sequence field to disambiguate.
+    4. Otherwise, the table-wide ``fields.default-aggregate-function``.
+    5. Otherwise, the system default ``last_non_null_value``.
+
+    Sequence fields take precedence over the table-wide
+    ``fields.default-aggregate-function``, matching Java: the value of a
+    ``sequence.field`` column must not be aggregated. An *explicit*
+    ``fields.<seq>.aggregate-function`` on a sequence column is rejected
+    up-front (see ``merge_engine_support.check_sequence_field_valid``), so
+    it never reaches this precedence.
     """
+    if field_name in sequence_fields:
+        return NAME_LAST_VALUE
     if field_name in primary_keys:
         return NAME_PRIMARY_KEY
     return (
@@ -96,9 +103,11 @@ def build_field_aggregators(
     """
     options_map = core_options.options.to_map()
     pk_set = set(primary_keys)
+    sequence_fields = set(core_options.sequence_field())
     aggregators = []
     for field in value_fields:
-        agg_name = resolve_agg_func_name(field.name, pk_set, options_map)
+        agg_name = resolve_agg_func_name(
+            field.name, pk_set, options_map, sequence_fields)
         aggregators.append(
             create_field_aggregator(
                 field.type, field.name, agg_name, core_options
diff --git a/paimon-python/pypaimon/read/reader/sort_merge_reader.py 
b/paimon-python/pypaimon/read/reader/sort_merge_reader.py
index 56f42b6f3c..1597745bbd 100644
--- a/paimon-python/pypaimon/read/reader/sort_merge_reader.py
+++ b/paimon-python/pypaimon/read/reader/sort_merge_reader.py
@@ -20,7 +20,7 @@ from typing import Any, Callable, List, Optional
 
 from pypaimon.read.reader.iface.record_iterator import RecordIterator
 from pypaimon.read.reader.iface.record_reader import RecordReader
-from pypaimon.schema.data_types import DataField, Keyword
+from pypaimon.schema.data_types import AtomicType, DataField, Keyword
 from pypaimon.schema.table_schema import TableSchema
 from pypaimon.table.row.internal_row import InternalRow
 from pypaimon.table.row.key_value import KeyValue
@@ -30,7 +30,8 @@ class SortMergeReaderWithMinHeap(RecordReader):
     """SortMergeReader implemented with min-heap."""
 
     def __init__(self, readers: List[RecordReader[KeyValue]], schema: 
TableSchema,
-                 merge_function: Optional[Any] = None):
+                 merge_function: Optional[Any] = None,
+                 seq_comparator: Optional[Callable[[Any, Any], int]] = None):
         self.next_batch_readers = list(readers)
         # Default to dedupe so callers that don't pass a merge_function
         # keep their old behaviour. The merge engine dispatch lives in
@@ -38,6 +39,12 @@ class SortMergeReaderWithMinHeap(RecordReader):
         # path; tests or other ad-hoc callers can pass a different
         # implementation here.
         self.merge_function = merge_function if merge_function is not None 
else DeduplicateMergeFunction()
+        # Optional user-defined sequence comparator (``sequence.field``).
+        # When set, it breaks key-ties on the value row before the
+        # file-level sequence number, mirroring Java's
+        # ``SortMergeReaderWithMinHeap`` + ``UserDefinedSeqComparator``.
+        # Built by the caller, which knows the value-side schema.
+        self.seq_comparator = seq_comparator
 
         if schema.partition_keys:
             trimmed_primary_keys = [pk for pk in schema.primary_keys if pk not 
in schema.partition_keys]
@@ -63,7 +70,8 @@ class SortMergeReaderWithMinHeap(RecordReader):
                 kv = iterator.next()
                 if kv is not None:
                     element = Element(kv, iterator, reader)
-                    entry = HeapEntry(kv.key, element, self.key_comparator)
+                    entry = HeapEntry(kv.key, element, self.key_comparator,
+                                      self.seq_comparator)
                     heapq.heappush(self.min_heap, entry)
                     break
 
@@ -78,6 +86,7 @@ class SortMergeReaderWithMinHeap(RecordReader):
             self.min_heap,
             self.merge_function,
             self.key_comparator,
+            self.seq_comparator,
         )
 
     def close(self):
@@ -93,12 +102,13 @@ class SortMergeReaderWithMinHeap(RecordReader):
 
 class SortMergeIterator(RecordIterator):
     def __init__(self, reader, polled: List['Element'], min_heap, 
merge_function,
-                 key_comparator):
+                 key_comparator, seq_comparator=None):
         self.reader = reader
         self.polled = polled
         self.min_heap = min_heap
         self.merge_function = merge_function
         self.key_comparator = key_comparator
+        self.seq_comparator = seq_comparator
         self.released = False
 
     def next(self):
@@ -112,7 +122,8 @@ class SortMergeIterator(RecordIterator):
     def _next_impl(self):
         for element in self.polled:
             if element.update():
-                entry = HeapEntry(element.kv.key, element, self.key_comparator)
+                entry = HeapEntry(element.kv.key, element, self.key_comparator,
+                                  self.seq_comparator)
                 heapq.heappush(self.min_heap, entry)
         self.polled.clear()
 
@@ -168,36 +179,82 @@ class Element:
 
 
 class HeapEntry:
-    def __init__(self, key: InternalRow, element: Element, key_comparator):
+    def __init__(self, key: InternalRow, element: Element, key_comparator,
+                 seq_comparator=None):
         self.key = key
         self.element = element
         self.key_comparator = key_comparator
+        self.seq_comparator = seq_comparator
 
     def __lt__(self, other):
+        # Heap order mirrors Java ``SortMergeReaderWithMinHeap``: user key
+        # -> user-defined sequence comparator (``sequence.field``) on the
+        # value row -> file-level sequence number.
         result = self.key_comparator(self.key, other.key)
-        if result < 0:
-            return True
-        elif result > 0:
-            return False
-
-        return self.element.kv.sequence_number < 
other.element.kv.sequence_number
-
-
-def builtin_key_comparator(key_schema: List[DataField]) -> Callable[[Any, 
Any], int]:
-    # Precompute comparability flags to avoid repeated type checks
-    comparable_types = {member.value for member in Keyword if member is not 
Keyword.VARIANT}
-    comparable_flags = [field.type.type.split(' ')[0] in comparable_types for 
field in key_schema]
-
-    def comparator(key1: InternalRow, key2: InternalRow) -> int:
-        if key1 is None and key2 is None:
+        if result == 0 and self.seq_comparator is not None:
+            result = self.seq_comparator(
+                self.element.kv.value, other.element.kv.value)
+        if result == 0:
+            result = self.element.kv.sequence_number - 
other.element.kv.sequence_number
+        return result < 0
+
+
+def _base_type_name(field: DataField) -> str:
+    """Base type keyword of a field, stripping any ``(precision[, scale])``
+    parameters and the ``NOT NULL`` suffix. E.g. ``DECIMAL(10, 2)`` and
+    ``TIMESTAMP(6)`` map to ``DECIMAL`` / ``TIMESTAMP``.
+    """
+    return field.type.type.split('(')[0].split(' ')[0]
+
+
+# Atomic type keywords pypaimon can order with Python's native comparison
+# operators. VARIANT is atomic but has no ordering, so it is excluded --
+# matching Java, which has no VARIANT sequence-field support.
+_COMPARABLE_TYPE_NAMES = frozenset(
+    member.value for member in Keyword if member is not Keyword.VARIANT)
+
+
+def is_comparable_seq_field(field: DataField) -> bool:
+    """Whether ``field`` can serve as a ``sequence.field`` for pypaimon's
+    atomic comparator: it must be an ``AtomicType`` whose base type name is
+    orderable. Complex types (ARRAY / MAP / ROW / ...) and the atomic-but-
+    unorderable VARIANT both return ``False``. Used by the read-builder
+    guard to reject unsupported sequence fields up front.
+    """
+    return (isinstance(field.type, AtomicType)
+            and _base_type_name(field) in _COMPARABLE_TYPE_NAMES)
+
+
+def _row_field_comparator(
+        fields: List[DataField],
+        indices: List[int],
+        ascending: bool = True) -> Callable[[Any, Any], int]:
+    """Build a comparator over two rows on the given ``indices`` (positions
+    in ``fields`` / the row's ``get_field``), compared left-to-right.
+
+    Shared by :func:`builtin_key_comparator` (all key fields, ascending) and
+    :func:`builtin_seq_comparator` (the configured sequence fields, with
+    sort-order). Comparability is precomputed once. ``None`` rows/values
+    always sort first, independent of ``ascending`` -- only the comparison
+    of two non-null values is reversed when ``ascending=False``. This
+    mirrors Java ``GenerateUtils.generateRowCompare`` built with
+    ``nullIsLast=false`` (see ``CodeGeneratorImpl#getSortSpec``), where
+    descending order flips only the non-null value comparison and leaves
+    nulls sorting first.
+    """
+    comparable_flags = [_base_type_name(fields[idx]) in _COMPARABLE_TYPE_NAMES 
for idx in indices]
+    sign = 1 if ascending else -1
+
+    def comparator(row1: InternalRow, row2: InternalRow) -> int:
+        if row1 is None and row2 is None:
             return 0
-        if key1 is None:
+        if row1 is None:
             return -1
-        if key2 is None:
+        if row2 is None:
             return 1
-        for i, comparable in enumerate(comparable_flags):
-            val1 = key1.get_field(i)
-            val2 = key2.get_field(i)
+        for pos, idx in enumerate(indices):
+            val1 = row1.get_field(idx)
+            val2 = row2.get_field(idx)
 
             if val1 is None and val2 is None:
                 continue
@@ -206,13 +263,73 @@ def builtin_key_comparator(key_schema: List[DataField]) 
-> Callable[[Any, Any],
             if val2 is None:
                 return 1
 
-            if not comparable:
-                raise ValueError(f"Unsupported {key_schema[i].type} 
comparison")
+            if not comparable_flags[pos]:
+                raise ValueError(f"Unsupported {fields[idx].type} comparison")
 
             if val1 < val2:
-                return -1
+                return -sign
             elif val1 > val2:
-                return 1
+                return sign
         return 0
 
     return comparator
+
+
+def builtin_key_comparator(key_schema: List[DataField]) -> Callable[[Any, 
Any], int]:
+    return _row_field_comparator(key_schema, list(range(len(key_schema))))
+
+
+def builtin_seq_comparator(
+        value_fields: List[DataField],
+        sequence_field_names: List[str],
+        ascending: bool) -> Optional[Callable[[Any, Any], int]]:
+    """Build a comparator for the user-defined ``sequence.field`` option.
+
+    Compares two *value* rows (the value side of a ``KeyValue``) on the
+    configured sequence fields, in declaration order, returning a negative
+    / zero / positive int. Mirrors Java ``UserDefinedSeqComparator``:
+
+    - ``sequence_field_names`` empty -> ``None`` (no comparator; the caller
+      falls back to the file-level sequence number).
+    - field names resolve to indices within the value row
+      (``value_fields`` is the value-side schema, == ``read_type``);
+      ``get_field(idx)`` indexes the value ``OffsetRow``.
+    - multiple fields compared left-to-right.
+    - ``ascending=False`` reverses only the non-null value comparison for
+      each field; null ordering stays nulls-first regardless of sort order
+      (mirroring Java's ``nullIsLast=false``). The value rows here carry a
+      homogeneous sort order, so reversing the final non-null comparison is
+      equivalent to Java reversing each field.
+
+    A name that does not resolve raises ``ValueError`` -- the read path
+    injects missing sequence fields into the projection before this runs,
+    so a miss indicates a wiring bug rather than user error.
+
+    A sequence field whose type pypaimon cannot order raises
+    ``NotImplementedError``: complex types (ARRAY / VECTOR / MAP / MULTISET /
+    ROW), which Java handles via ``RecordComparator``, and the atomic-but-
+    unorderable VARIANT. pypaimon only implements atomic-type comparison
+    here, so reject these explicitly rather than failing later with an
+    obscure error.
+    """
+    if not sequence_field_names:
+        return None
+
+    name_to_index = {field.name: i for i, field in enumerate(value_fields)}
+    indices = []
+    for name in sequence_field_names:
+        if name not in name_to_index:
+            raise ValueError(
+                f"sequence.field '{name}' not found in value fields "
+                f"{[f.name for f in value_fields]}")
+        idx = name_to_index[name]
+        if not is_comparable_seq_field(value_fields[idx]):
+            raise NotImplementedError(
+                f"sequence.field '{name}' has unsupported type "
+                f"{value_fields[idx].type}; pypaimon only supports orderable "
+                f"atomic sequence-field types. Complex types (ARRAY / MAP / "
+                f"ROW etc., handled by Java via RecordComparator) and VARIANT "
+                f"are not supported -- open an issue to track support.")
+        indices.append(idx)
+
+    return _row_field_comparator(value_fields, indices, ascending)
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 9b46a377c6..dc5f5e6100 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -61,7 +61,8 @@ from pypaimon.read.reader.partial_update_merge_function 
import \
 from pypaimon.read.reader.first_row_merge_function import \
     FirstRowMergeFunction
 from pypaimon.read.reader.sort_merge_reader import (DeduplicateMergeFunction,
-                                                    SortMergeReaderWithMinHeap)
+                                                    SortMergeReaderWithMinHeap,
+                                                    builtin_seq_comparator)
 from pypaimon.read.push_down_utils import _get_all_fields
 from pypaimon.read.split import Split
 from pypaimon.read.sliced_split import SlicedSplit
@@ -651,6 +652,15 @@ class MergeFileSplitRead(SplitRead):
         )
         self.outer_extract_name_paths = outer_extract_name_paths
         self.limit = limit
+        # Built once per split-read (value_fields and options are constant
+        # for the object's life), not per section. ``None`` when
+        # ``sequence.field`` is unset, in which case the heap falls back to
+        # the file-level sequence number.
+        self.seq_comparator = builtin_seq_comparator(
+            self.value_fields,
+            self.table.options.sequence_field(),
+            self.table.options.sequence_field_sort_order_is_ascending(),
+        )
 
     def kv_reader_supplier(self, file: DataFileMeta, dv_factory: 
Optional[Callable] = None) -> RecordReader:
         file_batch_reader = self.file_reader_supplier(file, True, 
self._get_final_read_data_fields(), False)
@@ -672,7 +682,8 @@ class MergeFileSplitRead(SplitRead):
             readers.append(ConcatRecordReader(data_readers))
         merge_function = self._build_merge_function()
         return SortMergeReaderWithMinHeap(
-            readers, self.table.table_schema, merge_function=merge_function)
+            readers, self.table.table_schema, merge_function=merge_function,
+            seq_comparator=self.seq_comparator)
 
     def _build_merge_function(self):
         """Pick the right MergeFunction implementation for the table's
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index 52a4eaaa7f..7c717df24d 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -532,6 +532,32 @@ class TableRead:
                 # the requested sub-paths back to the user's flat schema.
                 inner_read_type = self._widen_to_top_level_for_merge()
                 outer_extract_name_paths = self.nested_name_paths
+
+            # When the user's projection drops a ``sequence.field``, the merge
+            # heap can't compare it. Inject the missing sequence field(s) into
+            # the value row so the comparator resolves, then project them back
+            # out after merging (mirrors Java MergeFileSplitRead.withReadType +
+            # projectOuter). Reuses the OuterProjectionRecordReader machinery.
+            seq_fields = self.table.options.sequence_field()
+            if seq_fields:
+                present = {f.name for f in inner_read_type}
+                missing = [name for name in seq_fields if name not in present]
+                if missing:
+                    table_fields_by_name = {f.name: f for f in 
self.table.fields}
+                    extra = []
+                    for name in missing:
+                        field = table_fields_by_name.get(name)
+                        if field is None:
+                            raise ValueError(
+                                "sequence.field %r not found in table schema"
+                                % (name,))
+                        extra.append(field)
+                    inner_read_type = list(inner_read_type) + extra
+                    if outer_extract_name_paths is None:
+                        # Drop the injected seq columns: project back to the
+                        # user's requested (flat) columns in order.
+                        outer_extract_name_paths = [
+                            [f.name] for f in self.read_type]
             return MergeFileSplitRead(
                 table=self.table,
                 predicate=self.predicate,
diff --git a/paimon-python/pypaimon/tests/test_aggregation_e2e.py 
b/paimon-python/pypaimon/tests/test_aggregation_e2e.py
index 8eed935371..0d557799e9 100644
--- a/paimon-python/pypaimon/tests/test_aggregation_e2e.py
+++ b/paimon-python/pypaimon/tests/test_aggregation_e2e.py
@@ -27,7 +27,7 @@ behaviour when no aggregator is configured is 
``last_non_null_value``.
 
 The second half of the file exercises the merge-engine-support guard:
 tables that configure aggregation with options pypaimon does not yet
-implement (retract opt-ins, sequence fields, out-of-scope aggregator
+implement (retract opt-ins, sequence-group, out-of-scope aggregator
 identifiers) must raise ``NotImplementedError`` at TableRead
 construction rather than silently fall back to a wrong answer.
 """
@@ -227,17 +227,19 @@ class AggregationMergeEngineE2ETest(unittest.TestCase):
     # construction, not silently produce wrong results.
 
     def _create_and_expect_unsupported(self, table_name, extra_options,
-                                       expected_substring):
+                                       expected_substring,
+                                       error_type=NotImplementedError):
         table = self._create_pk_table(
             table_name, extra_options=extra_options
         )
         # Writing is fine — the guard fires when a reader is built.
         self._write(table, [{'id': 1, 'total': 1, 'max_score': 1, 'label': 
'a'}])
         rb = table.new_read_builder()
-        with self.assertRaises(NotImplementedError) as cm:
+        with self.assertRaises(error_type) as cm:
             rb.new_read()
         msg = str(cm.exception)
-        self.assertIn('aggregation', msg)
+        if error_type is NotImplementedError:
+            self.assertIn('aggregation', msg)
         self.assertIn(expected_substring, msg)
 
     def test_remove_record_on_delete_rejected(self):
@@ -254,11 +256,34 @@ class AggregationMergeEngineE2ETest(unittest.TestCase):
             'fields.total.ignore-retract',
         )
 
-    def test_sequence_field_rejected(self):
+    def test_sequence_field_supported(self):
+        # Top-level sequence.field is honored by the aggregation engine:
+        # aggregators fold in sequence-field order, not file order. Here
+        # ``last_value`` must pick the value from the highest-``total`` row
+        # even though it was written first.
+        table = self._create_pk_table(
+            'agg_sequence_field',
+            field_aggs={'max_score': 'last_value', 'label': 'last_value'},
+            extra_options={'sequence.field': 'total'},
+        )
+        self._write(table, [{'id': 1, 'total': 100, 'max_score': 9, 'label': 
'hi'}])
+        self._write(table, [{'id': 1, 'total': 50, 'max_score': 1, 'label': 
'lo'}])
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'total': 100, 'max_score': 9, 'label': 'hi'}],
+        )
+
+    def test_aggregate_function_on_sequence_field_rejected(self):
+        # An explicit aggregator on the sequence column is invalid: Java
+        # rejects fields.<seq>.aggregate-function in
+        # SchemaValidation.validateSequenceField. Rather than silently
+        # override 'sum' with last_value, the guard must reject it.
         self._create_and_expect_unsupported(
-            'agg_reject_sequence_field',
-            {'sequence.field': 'total'},
-            'sequence.field',
+            'agg_reject_agg_on_seq',
+            {'sequence.field': 'total',
+             'fields.total.aggregate-function': 'sum'},
+            'fields.total.aggregate-function',
+            error_type=ValueError,
         )
 
     def test_field_sequence_group_rejected(self):
diff --git a/paimon-python/pypaimon/tests/test_sequence_field_read.py 
b/paimon-python/pypaimon/tests/test_sequence_field_read.py
new file mode 100644
index 0000000000..87d4cbe42c
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_sequence_field_read.py
@@ -0,0 +1,544 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""End-to-end tests for the ``sequence.field`` option on the read path.
+
+``sequence.field`` lets the user pick an explicit column (or columns)
+whose value -- not the file-level sequence number -- decides which record
+is the "latest" for a primary key. The tricky case is when the
+write/file order disagrees with the ``sequence.field`` order: a row
+written *later* (higher file sequence number) carrying a *lower*
+``sequence.field`` value must lose to the earlier-written row. The Java
+merge path applies a ``UserDefinedSeqComparator`` on the value row before
+falling back to the file sequence number; pypaimon mirrors that via
+``builtin_seq_comparator`` wired into ``SortMergeReaderWithMinHeap``.
+"""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class SequenceFieldReadE2ETest(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+
+        cls.pa_schema = pa.schema([
+            pa.field('id', pa.int64(), nullable=False),
+            ('ts', pa.int64()),
+            ('ts2', pa.int64()),
+            ('val', pa.string()),
+        ])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def _create_pk_table(self, table_name, merge_engine='deduplicate',
+                         extra_options=None, partition_keys=None):
+        # bucket=1 forces all rows for a PK into one bucket so the read
+        # goes through SortMergeReader (where sequence ordering matters)
+        # instead of the raw-convertible fast path.
+        options = {
+            'bucket': '1',
+            'merge-engine': merge_engine,
+        }
+        if extra_options:
+            options.update(extra_options)
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['id'],
+            partition_keys=partition_keys or [],
+            options=options,
+        )
+        full = 'default.{}'.format(table_name)
+        self.catalog.create_table(full, schema, False)
+        return self.catalog.get_table(full)
+
+    def _write(self, table, rows):
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        c = wb.new_commit()
+        try:
+            w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema))
+            c.commit(w.prepare_commit())
+        finally:
+            w.close()
+            c.close()
+
+    def _read(self, table, projection=None, predicate=None):
+        rb = table.new_read_builder()
+        if projection is not None:
+            rb = rb.with_projection(projection)
+        if predicate is not None:
+            rb = rb.with_filter(predicate)
+        splits = rb.new_scan().plan().splits()
+        if not splits:
+            return []
+        return sorted(
+            rb.new_read().to_arrow(splits).to_pylist(),
+            key=lambda r: r['id'],
+        )
+
+    # -- basic ordering --------------------------------------------------
+
+    def test_later_write_with_lower_sequence_field_loses(self):
+        """The row written second has a higher file sequence number but a
+        lower ``sequence.field`` value, so the earlier (higher-ts) row
+        must win.
+        """
+        table = self._create_pk_table(
+            'seq_basic', extra_options={'sequence.field': 'ts'})
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+        self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}],
+        )
+
+    def test_later_write_with_higher_sequence_field_wins(self):
+        """Sanity check the non-inverted case still works."""
+        table = self._create_pk_table(
+            'seq_basic_fwd', extra_options={'sequence.field': 'ts'})
+        self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}])
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}],
+        )
+
+    # -- multiple sequence fields ----------------------------------------
+
+    def test_multi_sequence_field_left_to_right(self):
+        """When the first sequence field ties, the second breaks it."""
+        table = self._create_pk_table(
+            'seq_multi', extra_options={'sequence.field': 'ts,ts2'})
+        # Same ts; ts2 decides. Write the ts2-winner first so file order
+        # disagrees with the sequence-field order.
+        self._write(table, [{'id': 1, 'ts': 10, 'ts2': 99, 'val': 'win'}])
+        self._write(table, [{'id': 1, 'ts': 10, 'ts2': 1, 'val': 'lose'}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'ts': 10, 'ts2': 99, 'val': 'win'}],
+        )
+
+    # -- sort order ------------------------------------------------------
+
+    def test_descending_sort_order_lowest_wins(self):
+        """With descending sort order, the lowest ``sequence.field`` value
+        is considered the latest.
+        """
+        table = self._create_pk_table(
+            'seq_desc',
+            extra_options={'sequence.field': 'ts',
+                           'sequence.field.sort-order': 'descending'})
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+        self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}],
+        )
+
+    def test_descending_sort_order_null_sequence_sorts_first(self):
+        """Null ordering must stay independent of sort order: Java builds
+        the sequence comparator with ``nullIsLast=false`` and applies
+        descending only to non-null value comparisons, so a null
+        ``sequence.field`` value always sorts first (loses) -- even under
+        descending order. A non-null row must therefore beat a null-seq
+        row regardless of write order.
+        """
+        table = self._create_pk_table(
+            'seq_desc_null',
+            extra_options={'sequence.field': 'ts',
+                           'sequence.field.sort-order': 'descending'})
+        # null-seq row written second (higher file sequence number). With
+        # nulls-first ordering it still loses to the earlier non-null row.
+        self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'real'}])
+        self._write(table, [{'id': 1, 'ts': None, 'ts2': 0, 'val': 'null'}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'real'}],
+        )
+
+    def test_ascending_sort_order_null_sequence_sorts_first(self):
+        """Mirror of the descending case under the default ascending order:
+        a null ``sequence.field`` value sorts first (loses) to a non-null
+        row written earlier.
+        """
+        table = self._create_pk_table(
+            'seq_asc_null', extra_options={'sequence.field': 'ts'})
+        self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'real'}])
+        self._write(table, [{'id': 1, 'ts': None, 'ts2': 0, 'val': 'null'}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'real'}],
+        )
+
+    # -- projection drops the sequence field -----------------------------
+
+    def test_projection_dropping_sequence_field(self):
+        """Projecting columns that exclude the sequence field must still
+        return the sequence-field-correct row, and the output schema must
+        contain exactly the requested columns (no leaked ``ts``).
+        """
+        table = self._create_pk_table(
+            'seq_proj', extra_options={'sequence.field': 'ts'})
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+        self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}])
+
+        rows = self._read(table, projection=['id', 'val'])
+        self.assertEqual(rows, [{'id': 1, 'val': 'high'}])
+        # No injected sequence column leaks into the output.
+        self.assertEqual(set(rows[0].keys()), {'id', 'val'})
+
+    def test_projection_dropping_sequence_field_with_predicate(self):
+        """Projection drops the seq field AND a predicate filters on a
+        kept column -- predicate coordinates must stay correct against the
+        widened (seq-injected) read type.
+        """
+        table = self._create_pk_table(
+            'seq_proj_pred', extra_options={'sequence.field': 'ts'})
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'keep'}])
+        self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'drop'}])
+        self._write(table, [{'id': 2, 'ts': 5, 'ts2': 0, 'val': 'other'}])
+
+        rb = table.new_read_builder().with_projection(['id', 'val'])
+        pb = rb.new_predicate_builder()
+        rows = self._read(table, projection=['id', 'val'],
+                          predicate=pb.equal('val', 'keep'))
+        self.assertEqual(rows, [{'id': 1, 'val': 'keep'}])
+
+    # -- per merge engine ------------------------------------------------
+
+    def test_partial_update_respects_sequence_field(self):
+        """partial-update folds non-null fields in sequence-field order, so
+        a later-written but lower-ts row must not overwrite a field set by
+        the higher-ts row.
+        """
+        table = self._create_pk_table(
+            'seq_pu', merge_engine='partial-update',
+            extra_options={'sequence.field': 'ts'})
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+        self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}],
+        )
+
+    def test_first_row_with_sequence_field_rejected(self):
+        """sequence.field on the first-row merge engine is an invalid
+        configuration that Java rejects at schema validation
+        (SchemaValidation.validateSequenceField). pypaimon has no
+        schema-creation validation, so the read-builder guard must reject
+        it rather than silently apply a sequence ordering first-row never
+        honors on write.
+        """
+        table = self._create_pk_table(
+            'seq_fr', merge_engine='first-row',
+            extra_options={'sequence.field': 'ts'})
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+        with self.assertRaises(ValueError) as ctx:
+            table.new_read_builder().new_read()
+        self.assertIn('FIRST_ROW', str(ctx.exception))
+
+    def test_aggregation_last_value_respects_sequence_field(self):
+        """``last_value`` must pick the value from the highest-sequence-field
+        row, even when that row was written first.
+        """
+        table = self._create_pk_table(
+            'seq_agg', merge_engine='aggregation',
+            extra_options={
+                'sequence.field': 'ts',
+                'fields.val.aggregate-function': 'last_value',
+                'fields.ts2.aggregate-function': 'last_value',
+            })
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+        self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}],
+        )
+
+    # -- unsupported sub-features still rejected -------------------------
+
+    def test_missing_sequence_field_rejected(self):
+        """A sequence.field naming a column absent from the schema is
+        invalid (Java SchemaValidation). The guard must reject it with a
+        clear message before any read execution.
+        """
+        table = self._create_pk_table(
+            'seq_missing', extra_options={'sequence.field': 'nope'})
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
+        with self.assertRaises(ValueError) as ctx:
+            table.new_read_builder().new_read()
+        self.assertIn('nope', str(ctx.exception))
+
+    def test_duplicate_sequence_field_rejected(self):
+        """A sequence.field listing the same column twice is invalid
+        (Java SchemaValidation rejects repeated sequence fields).
+        """
+        table = self._create_pk_table(
+            'seq_dup', extra_options={'sequence.field': 'ts,ts'})
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
+        with self.assertRaises(ValueError) as ctx:
+            table.new_read_builder().new_read()
+        self.assertIn('ts', str(ctx.exception))
+
+    def test_empty_segment_sequence_field_rejected(self):
+        """A malformed ``sequence.field`` with an empty segment (e.g.
+        ``'ts,,ts2'``) leaves an empty field name after trimming -- matching
+        Java ``CoreOptions.sequenceField()``, which trims but does not drop
+        empty segments -- and must be rejected by validation rather than
+        silently accepted as ``['ts', 'ts2']``.
+        """
+        table = self._create_pk_table(
+            'seq_empty_seg', extra_options={'sequence.field': 'ts,,ts2'})
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
+        with self.assertRaises(ValueError) as ctx:
+            table.new_read_builder().new_read()
+        # The empty field name is the one that can't be found in the schema.
+        self.assertIn('can not be found', str(ctx.exception))
+
+    def test_cross_partition_update_with_sequence_field_rejected(self):
+        """sequence.field is invalid under cross-partition update (the PK
+        does not include all partition fields), matching Java
+        SchemaValidation.
+        """
+        table = self._create_pk_table(
+            'seq_xpart', extra_options={'sequence.field': 'ts'},
+            partition_keys=['ts2'])
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
+        with self.assertRaises(ValueError) as ctx:
+            table.new_read_builder().new_read()
+        self.assertIn('cross partition', str(ctx.exception).lower())
+
+    def test_aggregate_function_on_sequence_field_rejected(self):
+        """Defining an aggregator on the sequence column is invalid: Java
+        rejects fields.<seq>.aggregate-function outright in
+        SchemaValidation.validateSequenceField. The read-builder guard
+        must reject it rather than silently override the user's
+        aggregator with last_value.
+        """
+        table = self._create_pk_table(
+            'seq_agg_on_seq', merge_engine='aggregation',
+            extra_options={'sequence.field': 'ts',
+                           'fields.ts.aggregate-function': 'sum'})
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
+        with self.assertRaises(ValueError) as ctx:
+            table.new_read_builder().new_read()
+        self.assertIn('fields.ts.aggregate-function', str(ctx.exception))
+
+    def test_sequence_group_still_rejected(self):
+        """Top-level sequence.field is supported, but per-field
+        sequence-group is not -- the read must still raise.
+        """
+        table = self._create_pk_table(
+            'seq_group', merge_engine='partial-update',
+            extra_options={'sequence.field': 'ts',
+                           'fields.ts2.sequence-group': 'val'})
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
+        with self.assertRaises(NotImplementedError):
+            self._read(table)
+
+    def test_nested_sequence_field_rejected(self):
+        """nested-sequence-field is unimplemented and must be rejected
+        rather than silently ignored by the top-level comparator.
+        """
+        table = self._create_pk_table(
+            'seq_nested', merge_engine='deduplicate',
+            extra_options={'sequence.field': 'ts',
+                           'fields.val.nested-sequence-field': 'ts2'})
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
+        with self.assertRaises(NotImplementedError):
+            self._read(table)
+
+    def test_trailing_comma_sequence_field_tolerated(self):
+        """A trailing comma (``'ts,'``) must be tolerated, matching Java
+        ``String.split(',')`` which drops trailing empty segments. It
+        behaves exactly like ``'ts'`` -- not rejected as an empty field.
+        """
+        table = self._create_pk_table(
+            'seq_trailing', extra_options={'sequence.field': 'ts,'})
+        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+        self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}],
+        )
+
+    def test_complex_type_sequence_field_rejected(self):
+        """A complex (non-atomic) sequence field is valid in Java (handled
+        via RecordComparator) but unimplemented in pypaimon's atomic-only
+        comparator. It must be rejected with a clear NotImplementedError
+        rather than failing later with an obscure attribute error.
+        """
+        pa_schema = pa.schema([
+            pa.field('id', pa.int64(), nullable=False),
+            ('seq', pa.list_(pa.int64())),
+            ('val', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema, primary_keys=['id'],
+            options={'bucket': '1', 'merge-engine': 'deduplicate',
+                     'sequence.field': 'seq'})
+        self.catalog.create_table('default.seq_complex', schema, False)
+        table = self.catalog.get_table('default.seq_complex')
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        c = wb.new_commit()
+        try:
+            w.write_arrow(pa.Table.from_pylist(
+                [{'id': 1, 'seq': [1, 2], 'val': 'x'}], schema=pa_schema))
+            c.commit(w.prepare_commit())
+        finally:
+            w.close()
+            c.close()
+        with self.assertRaises(NotImplementedError) as ctx:
+            table.new_read_builder().new_read()
+        self.assertIn('seq', str(ctx.exception))
+
+
+class SequenceFieldComparabilityUnitTest(unittest.TestCase):
+    """Unit-level coverage of ``is_comparable_seq_field`` -- the predicate
+    behind the read-builder guard. VARIANT in particular is an
+    ``AtomicType`` but has no ordering, so it must be rejected like the
+    complex types rather than slipping through an ``isinstance(AtomicType)``
+    check.
+    """
+
+    def test_variant_sequence_field_not_comparable(self):
+        from pypaimon.read.reader.sort_merge_reader import (
+            is_comparable_seq_field)
+        from pypaimon.schema.data_types import AtomicType, DataField
+
+        variant = DataField(0, 'seq', AtomicType('VARIANT'))
+        self.assertFalse(is_comparable_seq_field(variant))
+
+    def test_atomic_types_are_comparable(self):
+        from pypaimon.read.reader.sort_merge_reader import (
+            is_comparable_seq_field)
+        from pypaimon.schema.data_types import AtomicType, DataField
+
+        for type_str in ('BIGINT', 'INT', 'TIMESTAMP(6)', 'DECIMAL(10, 2)',
+                         'STRING', 'BIGINT NOT NULL'):
+            field = DataField(0, 'seq', AtomicType(type_str))
+            self.assertTrue(is_comparable_seq_field(field),
+                            '{} should be comparable'.format(type_str))
+
+    def test_complex_types_not_comparable(self):
+        from pypaimon.read.reader.sort_merge_reader import (
+            is_comparable_seq_field)
+        from pypaimon.schema.data_types import (
+            ArrayType, AtomicType, DataField)
+
+        array = DataField(0, 'seq', ArrayType(True, AtomicType('INT')))
+        self.assertFalse(is_comparable_seq_field(array))
+
+
+class SequenceFieldParameterizedTypeTest(unittest.TestCase):
+    """The comparability check must accept parameterized atomic types
+    (TIMESTAMP(p), DECIMAL(p, s), TIME(p)) as sequence fields -- their
+    type string carries ``(...)`` which must not be mistaken for a
+    non-comparable type.
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.catalog = CatalogFactory.create(
+            {'warehouse': os.path.join(cls.tempdir, 'warehouse')})
+        cls.catalog.create_database('default', True)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def _run(self, table_name, pa_schema, rows_first, rows_second, expected):
+        schema = Schema.from_pyarrow_schema(
+            pa_schema, primary_keys=['id'],
+            options={'bucket': '1', 'merge-engine': 'deduplicate',
+                     'sequence.field': 'seq'})
+        full = 'default.{}'.format(table_name)
+        self.catalog.create_table(full, schema, False)
+        table = self.catalog.get_table(full)
+        for batch in (rows_first, rows_second):
+            wb = table.new_batch_write_builder()
+            w = wb.new_write()
+            c = wb.new_commit()
+            try:
+                w.write_arrow(pa.Table.from_pylist(batch, schema=pa_schema))
+                c.commit(w.prepare_commit())
+            finally:
+                w.close()
+                c.close()
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        rows = rb.new_read().to_arrow(splits).to_pylist()
+        self.assertEqual(rows, expected)
+
+    def test_timestamp_sequence_field(self):
+        import datetime
+        pa_schema = pa.schema([
+            pa.field('id', pa.int64(), nullable=False),
+            ('seq', pa.timestamp('us')),
+            ('val', pa.string()),
+        ])
+        hi = datetime.datetime(2020, 1, 2)
+        lo = datetime.datetime(2020, 1, 1)
+        # Later write has the lower timestamp -> earlier (higher-ts) wins.
+        self._run('seq_ts',
+                  pa_schema,
+                  [{'id': 1, 'seq': hi, 'val': 'high'}],
+                  [{'id': 1, 'seq': lo, 'val': 'low'}],
+                  [{'id': 1, 'seq': hi, 'val': 'high'}])
+
+    def test_decimal_sequence_field(self):
+        from decimal import Decimal
+        pa_schema = pa.schema([
+            pa.field('id', pa.int64(), nullable=False),
+            ('seq', pa.decimal128(10, 2)),
+            ('val', pa.string()),
+        ])
+        self._run('seq_dec',
+                  pa_schema,
+                  [{'id': 1, 'seq': Decimal('100.50'), 'val': 'high'}],
+                  [{'id': 1, 'seq': Decimal('50.25'), 'val': 'low'}],
+                  [{'id': 1, 'seq': Decimal('100.50'), 'val': 'high'}])
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to