This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8faf4c6b6d [python] Honor sequence.field on the primary-key read path
(#8075)
8faf4c6b6d is described below
commit 8faf4c6b6d2c4f7aa2e0973b600d5ccc8944ef9f
Author: Junrui Lee <[email protected]>
AuthorDate: Tue Jun 2 21:47:22 2026 +0800
[python] Honor sequence.field on the primary-key read path (#8075)
---
.../pypaimon/common/options/core_options.py | 54 +-
.../pypaimon/read/merge_engine_support.py | 141 +++++-
.../read/reader/aggregation_merge_function.py | 37 +-
.../pypaimon/read/reader/sort_merge_reader.py | 177 +++++--
paimon-python/pypaimon/read/split_read.py | 15 +-
paimon-python/pypaimon/read/table_read.py | 26 +
.../pypaimon/tests/test_aggregation_e2e.py | 41 +-
.../pypaimon/tests/test_sequence_field_read.py | 544 +++++++++++++++++++++
8 files changed, 970 insertions(+), 65 deletions(-)
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index bb157bb6e3..5a51878294 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -18,7 +18,7 @@
import sys
from datetime import timedelta
from enum import Enum
-from typing import Dict, Optional
+from typing import Dict, List, Optional
from pypaimon.common.memory_size import MemorySize
from pypaimon.common.options import Options
@@ -56,6 +56,15 @@ class MergeEngine(str, Enum):
FIRST_ROW = "first-row"
+class SortOrder(str, Enum):
+ """
+ Specifies the order of ``sequence.field``. Mirrors Java
+ ``CoreOptions.SortOrder``.
+ """
+ ASCENDING = "ascending"
+ DESCENDING = "descending"
+
+
class GlobalIndexColumnUpdateAction(str, Enum):
THROW_ERROR = "THROW_ERROR"
DROP_PARTITION_INDEX = "DROP_PARTITION_INDEX"
@@ -411,6 +420,22 @@ class CoreOptions:
.with_description("Whether to ignore delete records.")
)
+ SEQUENCE_FIELD: ConfigOption[str] = (
+ ConfigOptions.key("sequence.field")
+ .string_type()
+ .no_default_value()
+ .with_description("The field that generates the sequence number for "
+ "primary key table, the sequence number determines "
+ "which data is the most recent.")
+ )
+
+ SEQUENCE_FIELD_SORT_ORDER: ConfigOption[SortOrder] = (
+ ConfigOptions.key("sequence.field.sort-order")
+ .enum_type(SortOrder)
+ .default_value(SortOrder.ASCENDING)
+ .with_description("Specify the order of sequence.field.")
+ )
+
# Commit options
COMMIT_USER_PREFIX: ConfigOption[str] = (
ConfigOptions.key("commit.user-prefix")
@@ -808,6 +833,33 @@ class CoreOptions:
def merge_engine(self, default=None):
return self.options.get(CoreOptions.MERGE_ENGINE, default)
+ def sequence_field(self) -> List[str]:
+ """User-defined sequence fields, in declaration order. Empty list
+ when ``sequence.field`` is unset. Mirrors Java
+ ``CoreOptions.sequenceField()``.
+ """
+ raw = self.options.get(CoreOptions.SEQUENCE_FIELD)
+ if not raw:
+ return []
+ # Mirror Java ``CoreOptions.sequenceField()``
+ # (``Arrays.stream(s.split(',')).map(String::trim)``): Java's
+ # ``String.split(",")`` drops *trailing* empty segments (so ``'ts,'``
+ # yields ``['ts']``) but keeps interior ones, and each segment is
+ # then trimmed. So an interior empty segment (``'ts,,ts2'``) survives
+ # as an empty field name that ``check_sequence_field_valid`` rejects,
+ # while a trailing comma is tolerated.
+ segments = raw.split(",")
+ while segments and segments[-1] == "":
+ segments.pop()
+ return [name.strip() for name in segments]
+
+ def sequence_field_sort_order_is_ascending(self) -> bool:
+ """Whether ``sequence.field.sort-order`` is ascending (the default).
+ Mirrors Java ``CoreOptions.sequenceFieldSortOrderIsAscending()``.
+ """
+ return (self.options.get(CoreOptions.SEQUENCE_FIELD_SORT_ORDER)
+ == SortOrder.ASCENDING)
+
def ignore_delete(self) -> bool:
raw = self.options.to_map()
fallback_keys = (
diff --git a/paimon-python/pypaimon/read/merge_engine_support.py
b/paimon-python/pypaimon/read/merge_engine_support.py
index 8ab4813f7e..cb5d34ab87 100644
--- a/paimon-python/pypaimon/read/merge_engine_support.py
+++ b/paimon-python/pypaimon/read/merge_engine_support.py
@@ -63,22 +63,147 @@ _AGGREGATION_SUPPORTED_AGG_FUNCS = frozenset([
"sum", "max", "min",
"bool_or", "bool_and",
])
-_SEQUENCE_FIELD_KEY = "sequence.field"
_FIELDS_PREFIX = "fields."
_FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group"
_FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function"
_FIELD_IGNORE_RETRACT_SUFFIX = ".ignore-retract"
+_FIELD_NESTED_SEQUENCE_SUFFIX = ".nested-sequence-field"
_DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function"
+def _nested_sequence_field_options(table) -> Set[str]:
+ """Option keys configuring ``nested-sequence-field`` (a per-field
+ nested sequence ordering distinct from the top-level
+ ``sequence.field``). pypaimon implements top-level ``sequence.field``
+ but not nested sequence fields, so reject them on every PK engine
+ rather than silently ignoring them.
+ """
+ flagged: Set[str] = set()
+ raw = table.options.options.to_map()
+ for key in raw:
+ if key.startswith(_FIELDS_PREFIX) and key.endswith(
+ _FIELD_NESTED_SEQUENCE_SUFFIX):
+ flagged.add(key)
+ return flagged
+
+
+def _unsupported_sequence_fields(table) -> Set[str]:
+ """Configured ``sequence.field`` names whose type pypaimon cannot order.
+ Java's ``UserDefinedSeqComparator`` delegates to ``RecordComparator``
+ and supports ARRAY / VECTOR / MAP / MULTISET / ROW, but pypaimon's
+ ``builtin_seq_comparator`` only compares orderable atomic types. This
+ flags both complex (non-atomic) types and the atomic-but-unorderable
+ VARIANT, so a raw-convertible split (which skips the merge reader) can't
+ silently bypass the limitation.
+ """
+ from pypaimon.read.reader.sort_merge_reader import is_comparable_seq_field
+ flagged: Set[str] = set()
+ for field in table.options.sequence_field():
+ data_field = table.field_dict.get(field)
+ if data_field is not None and not is_comparable_seq_field(data_field):
+ flagged.add(field)
+ return flagged
+
+
+def check_sequence_field_valid(table) -> None:
+ """Reject ``sequence.field`` configurations Java forbids at schema
+ validation (``SchemaValidation.validateSequenceField``), raising
+ ``ValueError`` to mirror Java's ``IllegalArgumentException``.
+
+ These are invalid configurations, not deferred features, so they are
+ rejected on every merge engine regardless of pypaimon's read-path
+ coverage. Mirrors all of Java's checks:
+
+ 1. Every sequence field must exist in the table schema.
+ 2. No sequence field may be declared more than once.
+ 3. ``fields.<seq>.aggregate-function`` on a sequence column: Java
+ forbids aggregating the sequence column outright. pypaimon's
+ aggregation engine otherwise silently overrides it with
+ ``last_value``, hiding the misconfiguration.
+ 4. ``sequence.field`` together with ``merge-engine=first-row``:
+ first-row keeps the earliest-written row and never honors a
+ sequence ordering.
+ 5. ``sequence.field`` together with cross-partition update (the PK
+ does not include all partition fields).
+ """
+ sequence_fields = table.options.sequence_field()
+ if not sequence_fields:
+ return
+
+ field_names = set(table.field_names)
+ seen: Set[str] = set()
+ options_map = table.options.options.to_map()
+ for field in sequence_fields:
+ if field not in field_names:
+ raise ValueError(
+ "Sequence field: '{}' can not be found in table "
+ "schema.".format(field)
+ )
+ if field in seen:
+ raise ValueError(
+ "Sequence field '{}' is defined repeatedly.".format(field)
+ )
+ seen.add(field)
+ agg_key = "fields.{}.aggregate-function".format(field)
+ if options_map.get(agg_key) is not None:
+ raise ValueError(
+ "Should not define aggregation on sequence field: '{}' "
+ "({}).".format(field, agg_key)
+ )
+
+ if table.options.merge_engine() == MergeEngine.FIRST_ROW:
+ raise ValueError(
+ "Do not support use sequence.field on FIRST_ROW merge engine."
+ )
+
+ if table.cross_partition_update:
+ raise ValueError(
+ "You can not use sequence.field in cross partition update case "
+ "(primary keys {} do not include all partition fields "
+ "{}).".format(table.primary_keys, table.partition_keys)
+ )
+
+
def check_supported(table) -> None:
"""Raise ``NotImplementedError`` if the table's merge-engine
- configuration is outside what pypaimon's read path implements.
+ configuration is outside what pypaimon's read path implements, or
+ ``ValueError`` if it is an outright-invalid configuration that Java
+ rejects at schema validation.
Non-PK tables are always fine (no merge function involved).
"""
if not table.is_primary_key_table:
return
+ # ``nested-sequence-field`` is unimplemented on every engine; reject it
+ # before per-engine dispatch so it can't be silently ignored by the
+ # top-level ``sequence.field`` comparator.
+ nested_seq = _nested_sequence_field_options(table)
+ if nested_seq:
+ raise NotImplementedError(
+ "nested-sequence-field is not implemented in pypaimon yet: {}. "
+ "Top-level 'sequence.field' is supported; open an issue to track "
+ "nested sequence field support.".format(",
".join(sorted(nested_seq)))
+ )
+ # ``sequence.field`` validity is engine-independent in Java
+ # (SchemaValidation.validateSequenceField). pypaimon has no
+ # schema-creation validation, so enforce the same invariants here on
+ # the read path, before per-engine dispatch.
+ check_sequence_field_valid(table)
+ # ``sequence.field`` validity (above) is Java-aligned and engine
+ # independent. Some field *types* are valid in Java but unimplemented in
+ # pypaimon's orderable-atomic-only comparator (complex types, plus the
+ # atomic-but-unorderable VARIANT), so reject them as NotImplementedError
+ # here -- before per-engine dispatch, so a raw-convertible split can't
+ # bypass the merge reader and skip the check.
+ unsupported_seq = _unsupported_sequence_fields(table)
+ if unsupported_seq:
+ raise NotImplementedError(
+ "sequence.field with unsupported type is not implemented in "
+ "pypaimon yet: {}. pypaimon only supports orderable atomic "
+ "sequence-field types; complex types (ARRAY / MAP / ROW etc., "
+ "handled by Java via RecordComparator) and VARIANT are not "
+ "supported. Open an issue to track support.".format(
+ ", ".join(sorted(unsupported_seq))))
engine = table.options.merge_engine()
if engine == MergeEngine.DEDUPLICATE:
return
@@ -108,7 +233,7 @@ def check_supported(table) -> None:
"supported subset is per-key field aggregation with the "
"built-in aggregators ({}); retract opt-ins "
"(aggregation.remove-record-on-delete, "
- "fields.<f>.ignore-retract), sequence-field handling "
+ "fields.<f>.ignore-retract) "
"and other aggregators (product / listagg / collect / "
"merge_map* / nested_update* / theta_sketch / "
"hll_sketch / roaring_bitmap_*) are not yet supported. "
@@ -156,11 +281,9 @@ def aggregation_unsupported_options(table) -> Set[str]:
``fields.<f>.ignore-retract`` only make sense in conjunction
with DELETE / UPDATE_BEFORE handling, which the engine does not
implement.
- 2. Sequence-field configuration: ``sequence.field`` /
- ``fields.<f>.sequence-group`` are not supported; the merge
- function does not special-case sequence fields, so we refuse
- the table rather than silently merge them as ordinary value
- columns.
+ 2. Sequence-group configuration: ``fields.<f>.sequence-group`` is not
+ supported (top-level ``sequence.field`` is honored, see
+ ``builtin_seq_comparator``).
3. Out-of-scope aggregator selections: ``fields.<f>.aggregate-
function`` and ``fields.default-aggregate-function`` set to an
identifier this engine doesn't support yet (e.g. ``collect``,
@@ -172,8 +295,6 @@ def aggregation_unsupported_options(table) -> Set[str]:
if (key in _AGGREGATION_UNSUPPORTED_BOOLEAN_OPTIONS
and _option_is_truthy(value)):
flagged.add(key)
- elif key == _SEQUENCE_FIELD_KEY and value:
- flagged.add(key)
elif key == _DEFAULT_AGGREGATE_FUNCTION_KEY:
if value not in _AGGREGATION_SUPPORTED_AGG_FUNCS:
flagged.add(key)
diff --git a/paimon-python/pypaimon/read/reader/aggregation_merge_function.py
b/paimon-python/pypaimon/read/reader/aggregation_merge_function.py
index 5e2c149738..ae66673403 100644
--- a/paimon-python/pypaimon/read/reader/aggregation_merge_function.py
+++ b/paimon-python/pypaimon/read/reader/aggregation_merge_function.py
@@ -42,6 +42,7 @@ from typing import Any, List, Optional
from pypaimon.read.reader.aggregate import create_field_aggregator
from pypaimon.read.reader.aggregate.aggregators import (
NAME_LAST_NON_NULL_VALUE,
+ NAME_LAST_VALUE,
NAME_PRIMARY_KEY,
)
from pypaimon.read.reader.aggregate.field_aggregator import FieldAggregator
@@ -56,22 +57,28 @@ from pypaimon.table.row.row_kind import RowKind
# ---------------------------------------------------------------------------
-def resolve_agg_func_name(field_name, primary_keys, options_map):
- """Pick the aggregator identifier for ``field_name`` using the
- following precedence:
+def resolve_agg_func_name(field_name, primary_keys, options_map,
+ sequence_fields=()):
+ """Pick the aggregator identifier for ``field_name`` using the same
+ precedence as Java ``AggregateMergeFunction.getAggFuncName``:
- 1. Primary-key columns use ``primary_key`` (identity).
- 2. Otherwise, field-level ``fields.<f>.aggregate-function``
+ 1. Sequence fields use ``last_value`` (no aggregation -- the
+ sequence column just carries the latest-by-sequence value).
+ 2. Primary-key columns use ``primary_key`` (identity).
+ 3. Otherwise, field-level ``fields.<f>.aggregate-function``
overrides everything.
- 3. Otherwise, the table-wide ``fields.default-aggregate-function``.
- 4. Otherwise, the system default ``last_non_null_value``.
-
- Sequence fields are intentionally **not** special-cased here: the
- merge-engine guard in :mod:`pypaimon.read.merge_engine_support`
- rejects any table that sets ``sequence.field`` on the
- ``aggregation`` engine, so by the time this function runs there is
- no sequence field to disambiguate.
+ 4. Otherwise, the table-wide ``fields.default-aggregate-function``.
+ 5. Otherwise, the system default ``last_non_null_value``.
+
+ Sequence fields take precedence over the table-wide
+ ``fields.default-aggregate-function``, matching Java: the value of a
+ ``sequence.field`` column must not be aggregated. An *explicit*
+ ``fields.<seq>.aggregate-function`` on a sequence column is rejected
+ up-front (see ``merge_engine_support.check_sequence_field_valid``), so
+ it never reaches this precedence.
"""
+ if field_name in sequence_fields:
+ return NAME_LAST_VALUE
if field_name in primary_keys:
return NAME_PRIMARY_KEY
return (
@@ -96,9 +103,11 @@ def build_field_aggregators(
"""
options_map = core_options.options.to_map()
pk_set = set(primary_keys)
+ sequence_fields = set(core_options.sequence_field())
aggregators = []
for field in value_fields:
- agg_name = resolve_agg_func_name(field.name, pk_set, options_map)
+ agg_name = resolve_agg_func_name(
+ field.name, pk_set, options_map, sequence_fields)
aggregators.append(
create_field_aggregator(
field.type, field.name, agg_name, core_options
diff --git a/paimon-python/pypaimon/read/reader/sort_merge_reader.py
b/paimon-python/pypaimon/read/reader/sort_merge_reader.py
index 56f42b6f3c..1597745bbd 100644
--- a/paimon-python/pypaimon/read/reader/sort_merge_reader.py
+++ b/paimon-python/pypaimon/read/reader/sort_merge_reader.py
@@ -20,7 +20,7 @@ from typing import Any, Callable, List, Optional
from pypaimon.read.reader.iface.record_iterator import RecordIterator
from pypaimon.read.reader.iface.record_reader import RecordReader
-from pypaimon.schema.data_types import DataField, Keyword
+from pypaimon.schema.data_types import AtomicType, DataField, Keyword
from pypaimon.schema.table_schema import TableSchema
from pypaimon.table.row.internal_row import InternalRow
from pypaimon.table.row.key_value import KeyValue
@@ -30,7 +30,8 @@ class SortMergeReaderWithMinHeap(RecordReader):
"""SortMergeReader implemented with min-heap."""
def __init__(self, readers: List[RecordReader[KeyValue]], schema:
TableSchema,
- merge_function: Optional[Any] = None):
+ merge_function: Optional[Any] = None,
+ seq_comparator: Optional[Callable[[Any, Any], int]] = None):
self.next_batch_readers = list(readers)
# Default to dedupe so callers that don't pass a merge_function
# keep their old behaviour. The merge engine dispatch lives in
@@ -38,6 +39,12 @@ class SortMergeReaderWithMinHeap(RecordReader):
# path; tests or other ad-hoc callers can pass a different
# implementation here.
self.merge_function = merge_function if merge_function is not None
else DeduplicateMergeFunction()
+ # Optional user-defined sequence comparator (``sequence.field``).
+ # When set, it breaks key-ties on the value row before the
+ # file-level sequence number, mirroring Java's
+ # ``SortMergeReaderWithMinHeap`` + ``UserDefinedSeqComparator``.
+ # Built by the caller, which knows the value-side schema.
+ self.seq_comparator = seq_comparator
if schema.partition_keys:
trimmed_primary_keys = [pk for pk in schema.primary_keys if pk not
in schema.partition_keys]
@@ -63,7 +70,8 @@ class SortMergeReaderWithMinHeap(RecordReader):
kv = iterator.next()
if kv is not None:
element = Element(kv, iterator, reader)
- entry = HeapEntry(kv.key, element, self.key_comparator)
+ entry = HeapEntry(kv.key, element, self.key_comparator,
+ self.seq_comparator)
heapq.heappush(self.min_heap, entry)
break
@@ -78,6 +86,7 @@ class SortMergeReaderWithMinHeap(RecordReader):
self.min_heap,
self.merge_function,
self.key_comparator,
+ self.seq_comparator,
)
def close(self):
@@ -93,12 +102,13 @@ class SortMergeReaderWithMinHeap(RecordReader):
class SortMergeIterator(RecordIterator):
def __init__(self, reader, polled: List['Element'], min_heap,
merge_function,
- key_comparator):
+ key_comparator, seq_comparator=None):
self.reader = reader
self.polled = polled
self.min_heap = min_heap
self.merge_function = merge_function
self.key_comparator = key_comparator
+ self.seq_comparator = seq_comparator
self.released = False
def next(self):
@@ -112,7 +122,8 @@ class SortMergeIterator(RecordIterator):
def _next_impl(self):
for element in self.polled:
if element.update():
- entry = HeapEntry(element.kv.key, element, self.key_comparator)
+ entry = HeapEntry(element.kv.key, element, self.key_comparator,
+ self.seq_comparator)
heapq.heappush(self.min_heap, entry)
self.polled.clear()
@@ -168,36 +179,82 @@ class Element:
class HeapEntry:
- def __init__(self, key: InternalRow, element: Element, key_comparator):
+ def __init__(self, key: InternalRow, element: Element, key_comparator,
+ seq_comparator=None):
self.key = key
self.element = element
self.key_comparator = key_comparator
+ self.seq_comparator = seq_comparator
def __lt__(self, other):
+ # Heap order mirrors Java ``SortMergeReaderWithMinHeap``: user key
+ # -> user-defined sequence comparator (``sequence.field``) on the
+ # value row -> file-level sequence number.
result = self.key_comparator(self.key, other.key)
- if result < 0:
- return True
- elif result > 0:
- return False
-
- return self.element.kv.sequence_number <
other.element.kv.sequence_number
-
-
-def builtin_key_comparator(key_schema: List[DataField]) -> Callable[[Any,
Any], int]:
- # Precompute comparability flags to avoid repeated type checks
- comparable_types = {member.value for member in Keyword if member is not
Keyword.VARIANT}
- comparable_flags = [field.type.type.split(' ')[0] in comparable_types for
field in key_schema]
-
- def comparator(key1: InternalRow, key2: InternalRow) -> int:
- if key1 is None and key2 is None:
+ if result == 0 and self.seq_comparator is not None:
+ result = self.seq_comparator(
+ self.element.kv.value, other.element.kv.value)
+ if result == 0:
+ result = self.element.kv.sequence_number -
other.element.kv.sequence_number
+ return result < 0
+
+
+def _base_type_name(field: DataField) -> str:
+ """Base type keyword of a field, stripping any ``(precision[, scale])``
+ parameters and the ``NOT NULL`` suffix. E.g. ``DECIMAL(10, 2)`` and
+ ``TIMESTAMP(6)`` map to ``DECIMAL`` / ``TIMESTAMP``.
+ """
+ return field.type.type.split('(')[0].split(' ')[0]
+
+
+# Atomic type keywords pypaimon can order with Python's native comparison
+# operators. VARIANT is atomic but has no ordering, so it is excluded --
+# matching Java, which has no VARIANT sequence-field support.
+_COMPARABLE_TYPE_NAMES = frozenset(
+ member.value for member in Keyword if member is not Keyword.VARIANT)
+
+
+def is_comparable_seq_field(field: DataField) -> bool:
+ """Whether ``field`` can serve as a ``sequence.field`` for pypaimon's
+ atomic comparator: it must be an ``AtomicType`` whose base type name is
+ orderable. Complex types (ARRAY / MAP / ROW / ...) and the atomic-but-
+ unorderable VARIANT both return ``False``. Used by the read-builder
+ guard to reject unsupported sequence fields up front.
+ """
+ return (isinstance(field.type, AtomicType)
+ and _base_type_name(field) in _COMPARABLE_TYPE_NAMES)
+
+
+def _row_field_comparator(
+ fields: List[DataField],
+ indices: List[int],
+ ascending: bool = True) -> Callable[[Any, Any], int]:
+ """Build a comparator over two rows on the given ``indices`` (positions
+ in ``fields`` / the row's ``get_field``), compared left-to-right.
+
+ Shared by :func:`builtin_key_comparator` (all key fields, ascending) and
+ :func:`builtin_seq_comparator` (the configured sequence fields, with
+ sort-order). Comparability is precomputed once. ``None`` rows/values
+ always sort first, independent of ``ascending`` -- only the comparison
+ of two non-null values is reversed when ``ascending=False``. This
+ mirrors Java ``GenerateUtils.generateRowCompare`` built with
+ ``nullIsLast=false`` (see ``CodeGeneratorImpl#getSortSpec``), where
+ descending order flips only the non-null value comparison and leaves
+ nulls sorting first.
+ """
+ comparable_flags = [_base_type_name(fields[idx]) in _COMPARABLE_TYPE_NAMES
for idx in indices]
+ sign = 1 if ascending else -1
+
+ def comparator(row1: InternalRow, row2: InternalRow) -> int:
+ if row1 is None and row2 is None:
return 0
- if key1 is None:
+ if row1 is None:
return -1
- if key2 is None:
+ if row2 is None:
return 1
- for i, comparable in enumerate(comparable_flags):
- val1 = key1.get_field(i)
- val2 = key2.get_field(i)
+ for pos, idx in enumerate(indices):
+ val1 = row1.get_field(idx)
+ val2 = row2.get_field(idx)
if val1 is None and val2 is None:
continue
@@ -206,13 +263,73 @@ def builtin_key_comparator(key_schema: List[DataField])
-> Callable[[Any, Any],
if val2 is None:
return 1
- if not comparable:
- raise ValueError(f"Unsupported {key_schema[i].type}
comparison")
+ if not comparable_flags[pos]:
+ raise ValueError(f"Unsupported {fields[idx].type} comparison")
if val1 < val2:
- return -1
+ return -sign
elif val1 > val2:
- return 1
+ return sign
return 0
return comparator
+
+
+def builtin_key_comparator(key_schema: List[DataField]) -> Callable[[Any,
Any], int]:
+ return _row_field_comparator(key_schema, list(range(len(key_schema))))
+
+
+def builtin_seq_comparator(
+ value_fields: List[DataField],
+ sequence_field_names: List[str],
+ ascending: bool) -> Optional[Callable[[Any, Any], int]]:
+ """Build a comparator for the user-defined ``sequence.field`` option.
+
+ Compares two *value* rows (the value side of a ``KeyValue``) on the
+ configured sequence fields, in declaration order, returning a negative
+ / zero / positive int. Mirrors Java ``UserDefinedSeqComparator``:
+
+ - ``sequence_field_names`` empty -> ``None`` (no comparator; the caller
+ falls back to the file-level sequence number).
+ - field names resolve to indices within the value row
+ (``value_fields`` is the value-side schema, == ``read_type``);
+ ``get_field(idx)`` indexes the value ``OffsetRow``.
+ - multiple fields compared left-to-right.
+ - ``ascending=False`` reverses only the non-null value comparison for
+ each field; null ordering stays nulls-first regardless of sort order
+ (mirroring Java's ``nullIsLast=false``). The value rows here carry a
+ homogeneous sort order, so reversing the final non-null comparison is
+ equivalent to Java reversing each field.
+
+ A name that does not resolve raises ``ValueError`` -- the read path
+ injects missing sequence fields into the projection before this runs,
+ so a miss indicates a wiring bug rather than user error.
+
+ A sequence field whose type pypaimon cannot order raises
+ ``NotImplementedError``: complex types (ARRAY / VECTOR / MAP / MULTISET /
+ ROW), which Java handles via ``RecordComparator``, and the atomic-but-
+ unorderable VARIANT. pypaimon only implements atomic-type comparison
+ here, so reject these explicitly rather than failing later with an
+ obscure error.
+ """
+ if not sequence_field_names:
+ return None
+
+ name_to_index = {field.name: i for i, field in enumerate(value_fields)}
+ indices = []
+ for name in sequence_field_names:
+ if name not in name_to_index:
+ raise ValueError(
+ f"sequence.field '{name}' not found in value fields "
+ f"{[f.name for f in value_fields]}")
+ idx = name_to_index[name]
+ if not is_comparable_seq_field(value_fields[idx]):
+ raise NotImplementedError(
+ f"sequence.field '{name}' has unsupported type "
+ f"{value_fields[idx].type}; pypaimon only supports orderable "
+ f"atomic sequence-field types. Complex types (ARRAY / MAP / "
+ f"ROW etc., handled by Java via RecordComparator) and VARIANT "
+ f"are not supported -- open an issue to track support.")
+ indices.append(idx)
+
+ return _row_field_comparator(value_fields, indices, ascending)
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 9b46a377c6..dc5f5e6100 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -61,7 +61,8 @@ from pypaimon.read.reader.partial_update_merge_function
import \
from pypaimon.read.reader.first_row_merge_function import \
FirstRowMergeFunction
from pypaimon.read.reader.sort_merge_reader import (DeduplicateMergeFunction,
- SortMergeReaderWithMinHeap)
+ SortMergeReaderWithMinHeap,
+ builtin_seq_comparator)
from pypaimon.read.push_down_utils import _get_all_fields
from pypaimon.read.split import Split
from pypaimon.read.sliced_split import SlicedSplit
@@ -651,6 +652,15 @@ class MergeFileSplitRead(SplitRead):
)
self.outer_extract_name_paths = outer_extract_name_paths
self.limit = limit
+ # Built once per split-read (value_fields and options are constant
+ # for the object's life), not per section. ``None`` when
+ # ``sequence.field`` is unset, in which case the heap falls back to
+ # the file-level sequence number.
+ self.seq_comparator = builtin_seq_comparator(
+ self.value_fields,
+ self.table.options.sequence_field(),
+ self.table.options.sequence_field_sort_order_is_ascending(),
+ )
def kv_reader_supplier(self, file: DataFileMeta, dv_factory:
Optional[Callable] = None) -> RecordReader:
file_batch_reader = self.file_reader_supplier(file, True,
self._get_final_read_data_fields(), False)
@@ -672,7 +682,8 @@ class MergeFileSplitRead(SplitRead):
readers.append(ConcatRecordReader(data_readers))
merge_function = self._build_merge_function()
return SortMergeReaderWithMinHeap(
- readers, self.table.table_schema, merge_function=merge_function)
+ readers, self.table.table_schema, merge_function=merge_function,
+ seq_comparator=self.seq_comparator)
def _build_merge_function(self):
"""Pick the right MergeFunction implementation for the table's
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index 52a4eaaa7f..7c717df24d 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -532,6 +532,32 @@ class TableRead:
# the requested sub-paths back to the user's flat schema.
inner_read_type = self._widen_to_top_level_for_merge()
outer_extract_name_paths = self.nested_name_paths
+
+ # When the user's projection drops a ``sequence.field``, the merge
+ # heap can't compare it. Inject the missing sequence field(s) into
+ # the value row so the comparator resolves, then project them back
+ # out after merging (mirrors Java MergeFileSplitRead.withReadType +
+ # projectOuter). Reuses the OuterProjectionRecordReader machinery.
+ seq_fields = self.table.options.sequence_field()
+ if seq_fields:
+ present = {f.name for f in inner_read_type}
+ missing = [name for name in seq_fields if name not in present]
+ if missing:
+ table_fields_by_name = {f.name: f for f in
self.table.fields}
+ extra = []
+ for name in missing:
+ field = table_fields_by_name.get(name)
+ if field is None:
+ raise ValueError(
+ "sequence.field %r not found in table schema"
+ % (name,))
+ extra.append(field)
+ inner_read_type = list(inner_read_type) + extra
+ if outer_extract_name_paths is None:
+ # Drop the injected seq columns: project back to the
+ # user's requested (flat) columns in order.
+ outer_extract_name_paths = [
+ [f.name] for f in self.read_type]
return MergeFileSplitRead(
table=self.table,
predicate=self.predicate,
diff --git a/paimon-python/pypaimon/tests/test_aggregation_e2e.py
b/paimon-python/pypaimon/tests/test_aggregation_e2e.py
index 8eed935371..0d557799e9 100644
--- a/paimon-python/pypaimon/tests/test_aggregation_e2e.py
+++ b/paimon-python/pypaimon/tests/test_aggregation_e2e.py
@@ -27,7 +27,7 @@ behaviour when no aggregator is configured is
``last_non_null_value``.
The second half of the file exercises the merge-engine-support guard:
tables that configure aggregation with options pypaimon does not yet
-implement (retract opt-ins, sequence fields, out-of-scope aggregator
+implement (retract opt-ins, sequence-group, out-of-scope aggregator
identifiers) must raise ``NotImplementedError`` at TableRead
construction rather than silently fall back to a wrong answer.
"""
@@ -227,17 +227,19 @@ class AggregationMergeEngineE2ETest(unittest.TestCase):
# construction, not silently produce wrong results.
def _create_and_expect_unsupported(self, table_name, extra_options,
- expected_substring):
+ expected_substring,
+ error_type=NotImplementedError):
table = self._create_pk_table(
table_name, extra_options=extra_options
)
# Writing is fine — the guard fires when a reader is built.
self._write(table, [{'id': 1, 'total': 1, 'max_score': 1, 'label':
'a'}])
rb = table.new_read_builder()
- with self.assertRaises(NotImplementedError) as cm:
+ with self.assertRaises(error_type) as cm:
rb.new_read()
msg = str(cm.exception)
- self.assertIn('aggregation', msg)
+ if error_type is NotImplementedError:
+ self.assertIn('aggregation', msg)
self.assertIn(expected_substring, msg)
def test_remove_record_on_delete_rejected(self):
@@ -254,11 +256,34 @@ class AggregationMergeEngineE2ETest(unittest.TestCase):
'fields.total.ignore-retract',
)
- def test_sequence_field_rejected(self):
+ def test_sequence_field_supported(self):
+ # Top-level sequence.field is honored by the aggregation engine:
+ # aggregators fold in sequence-field order, not file order. Here
+ # ``last_value`` must pick the value from the highest-``total`` row
+ # even though it was written first.
+ table = self._create_pk_table(
+ 'agg_sequence_field',
+ field_aggs={'max_score': 'last_value', 'label': 'last_value'},
+ extra_options={'sequence.field': 'total'},
+ )
+ self._write(table, [{'id': 1, 'total': 100, 'max_score': 9, 'label':
'hi'}])
+ self._write(table, [{'id': 1, 'total': 50, 'max_score': 1, 'label':
'lo'}])
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'total': 100, 'max_score': 9, 'label': 'hi'}],
+ )
+
+ def test_aggregate_function_on_sequence_field_rejected(self):
+ # An explicit aggregator on the sequence column is invalid: Java
+ # rejects fields.<seq>.aggregate-function in
+ # SchemaValidation.validateSequenceField. Rather than silently
+ # override 'sum' with last_value, the guard must reject it.
self._create_and_expect_unsupported(
- 'agg_reject_sequence_field',
- {'sequence.field': 'total'},
- 'sequence.field',
+ 'agg_reject_agg_on_seq',
+ {'sequence.field': 'total',
+ 'fields.total.aggregate-function': 'sum'},
+ 'fields.total.aggregate-function',
+ error_type=ValueError,
)
def test_field_sequence_group_rejected(self):
diff --git a/paimon-python/pypaimon/tests/test_sequence_field_read.py
b/paimon-python/pypaimon/tests/test_sequence_field_read.py
new file mode 100644
index 0000000000..87d4cbe42c
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_sequence_field_read.py
@@ -0,0 +1,544 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""End-to-end tests for the ``sequence.field`` option on the read path.
+
+``sequence.field`` lets the user pick an explicit column (or columns)
+whose value -- not the file-level sequence number -- decides which record
+is the "latest" for a primary key. The tricky case is when the
+write/file order disagrees with the ``sequence.field`` order: a row
+written *later* (higher file sequence number) carrying a *lower*
+``sequence.field`` value must lose to the earlier-written row. The Java
+merge path applies a ``UserDefinedSeqComparator`` on the value row before
+falling back to the file sequence number; pypaimon mirrors that via
+``builtin_seq_comparator`` wired into ``SortMergeReaderWithMinHeap``.
+"""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class SequenceFieldReadE2ETest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', True)
+
+ cls.pa_schema = pa.schema([
+ pa.field('id', pa.int64(), nullable=False),
+ ('ts', pa.int64()),
+ ('ts2', pa.int64()),
+ ('val', pa.string()),
+ ])
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def _create_pk_table(self, table_name, merge_engine='deduplicate',
+ extra_options=None, partition_keys=None):
+ # bucket=1 forces all rows for a PK into one bucket so the read
+ # goes through SortMergeReader (where sequence ordering matters)
+ # instead of the raw-convertible fast path.
+ options = {
+ 'bucket': '1',
+ 'merge-engine': merge_engine,
+ }
+ if extra_options:
+ options.update(extra_options)
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['id'],
+ partition_keys=partition_keys or [],
+ options=options,
+ )
+ full = 'default.{}'.format(table_name)
+ self.catalog.create_table(full, schema, False)
+ return self.catalog.get_table(full)
+
+ def _write(self, table, rows):
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ c = wb.new_commit()
+ try:
+ w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema))
+ c.commit(w.prepare_commit())
+ finally:
+ w.close()
+ c.close()
+
+ def _read(self, table, projection=None, predicate=None):
+ rb = table.new_read_builder()
+ if projection is not None:
+ rb = rb.with_projection(projection)
+ if predicate is not None:
+ rb = rb.with_filter(predicate)
+ splits = rb.new_scan().plan().splits()
+ if not splits:
+ return []
+ return sorted(
+ rb.new_read().to_arrow(splits).to_pylist(),
+ key=lambda r: r['id'],
+ )
+
+ # -- basic ordering --------------------------------------------------
+
+ def test_later_write_with_lower_sequence_field_loses(self):
+ """The row written second has a higher file sequence number but a
+ lower ``sequence.field`` value, so the earlier (higher-ts) row
+ must win.
+ """
+ table = self._create_pk_table(
+ 'seq_basic', extra_options={'sequence.field': 'ts'})
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+ self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}],
+ )
+
+ def test_later_write_with_higher_sequence_field_wins(self):
+ """Sanity check the non-inverted case still works."""
+ table = self._create_pk_table(
+ 'seq_basic_fwd', extra_options={'sequence.field': 'ts'})
+ self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}])
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}],
+ )
+
+ # -- multiple sequence fields ----------------------------------------
+
+ def test_multi_sequence_field_left_to_right(self):
+ """When the first sequence field ties, the second breaks it."""
+ table = self._create_pk_table(
+ 'seq_multi', extra_options={'sequence.field': 'ts,ts2'})
+ # Same ts; ts2 decides. Write the ts2-winner first so file order
+ # disagrees with the sequence-field order.
+ self._write(table, [{'id': 1, 'ts': 10, 'ts2': 99, 'val': 'win'}])
+ self._write(table, [{'id': 1, 'ts': 10, 'ts2': 1, 'val': 'lose'}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'ts': 10, 'ts2': 99, 'val': 'win'}],
+ )
+
+ # -- sort order ------------------------------------------------------
+
+ def test_descending_sort_order_lowest_wins(self):
+ """With descending sort order, the lowest ``sequence.field`` value
+ is considered the latest.
+ """
+ table = self._create_pk_table(
+ 'seq_desc',
+ extra_options={'sequence.field': 'ts',
+ 'sequence.field.sort-order': 'descending'})
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+ self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}],
+ )
+
+ def test_descending_sort_order_null_sequence_sorts_first(self):
+ """Null ordering must stay independent of sort order: Java builds
+ the sequence comparator with ``nullIsLast=false`` and applies
+ descending only to non-null value comparisons, so a null
+ ``sequence.field`` value always sorts first (loses) -- even under
+ descending order. A non-null row must therefore beat a null-seq
+ row regardless of write order.
+ """
+ table = self._create_pk_table(
+ 'seq_desc_null',
+ extra_options={'sequence.field': 'ts',
+ 'sequence.field.sort-order': 'descending'})
+ # null-seq row written second (higher file sequence number). With
+ # nulls-first ordering it still loses to the earlier non-null row.
+ self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'real'}])
+ self._write(table, [{'id': 1, 'ts': None, 'ts2': 0, 'val': 'null'}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'real'}],
+ )
+
+ def test_ascending_sort_order_null_sequence_sorts_first(self):
+ """Mirror of the descending case under the default ascending order:
+ a null ``sequence.field`` value sorts first (loses) to a non-null
+ row written earlier.
+ """
+ table = self._create_pk_table(
+ 'seq_asc_null', extra_options={'sequence.field': 'ts'})
+ self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'real'}])
+ self._write(table, [{'id': 1, 'ts': None, 'ts2': 0, 'val': 'null'}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'real'}],
+ )
+
+ # -- projection drops the sequence field -----------------------------
+
+ def test_projection_dropping_sequence_field(self):
+ """Projecting columns that exclude the sequence field must still
+ return the sequence-field-correct row, and the output schema must
+ contain exactly the requested columns (no leaked ``ts``).
+ """
+ table = self._create_pk_table(
+ 'seq_proj', extra_options={'sequence.field': 'ts'})
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+ self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}])
+
+ rows = self._read(table, projection=['id', 'val'])
+ self.assertEqual(rows, [{'id': 1, 'val': 'high'}])
+ # No injected sequence column leaks into the output.
+ self.assertEqual(set(rows[0].keys()), {'id', 'val'})
+
+ def test_projection_dropping_sequence_field_with_predicate(self):
+ """Projection drops the seq field AND a predicate filters on a
+ kept column -- predicate coordinates must stay correct against the
+ widened (seq-injected) read type.
+ """
+ table = self._create_pk_table(
+ 'seq_proj_pred', extra_options={'sequence.field': 'ts'})
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'keep'}])
+ self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'drop'}])
+ self._write(table, [{'id': 2, 'ts': 5, 'ts2': 0, 'val': 'other'}])
+
+ rb = table.new_read_builder().with_projection(['id', 'val'])
+ pb = rb.new_predicate_builder()
+ rows = self._read(table, projection=['id', 'val'],
+ predicate=pb.equal('val', 'keep'))
+ self.assertEqual(rows, [{'id': 1, 'val': 'keep'}])
+
+ # -- per merge engine ------------------------------------------------
+
+ def test_partial_update_respects_sequence_field(self):
+ """partial-update folds non-null fields in sequence-field order, so
+ a later-written but lower-ts row must not overwrite a field set by
+ the higher-ts row.
+ """
+ table = self._create_pk_table(
+ 'seq_pu', merge_engine='partial-update',
+ extra_options={'sequence.field': 'ts'})
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+ self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}],
+ )
+
+ def test_first_row_with_sequence_field_rejected(self):
+ """sequence.field on the first-row merge engine is an invalid
+ configuration that Java rejects at schema validation
+ (SchemaValidation.validateSequenceField). pypaimon has no
+ schema-creation validation, so the read-builder guard must reject
+ it rather than silently apply a sequence ordering first-row never
+ honors on write.
+ """
+ table = self._create_pk_table(
+ 'seq_fr', merge_engine='first-row',
+ extra_options={'sequence.field': 'ts'})
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+ with self.assertRaises(ValueError) as ctx:
+ table.new_read_builder().new_read()
+ self.assertIn('FIRST_ROW', str(ctx.exception))
+
+ def test_aggregation_last_value_respects_sequence_field(self):
+ """``last_value`` must pick the value from the highest-sequence-field
+ row, even when that row was written first.
+ """
+ table = self._create_pk_table(
+ 'seq_agg', merge_engine='aggregation',
+ extra_options={
+ 'sequence.field': 'ts',
+ 'fields.val.aggregate-function': 'last_value',
+ 'fields.ts2.aggregate-function': 'last_value',
+ })
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+ self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}],
+ )
+
+ # -- unsupported sub-features still rejected -------------------------
+
+ def test_missing_sequence_field_rejected(self):
+ """A sequence.field naming a column absent from the schema is
+ invalid (Java SchemaValidation). The guard must reject it with a
+ clear message before any read execution.
+ """
+ table = self._create_pk_table(
+ 'seq_missing', extra_options={'sequence.field': 'nope'})
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
+ with self.assertRaises(ValueError) as ctx:
+ table.new_read_builder().new_read()
+ self.assertIn('nope', str(ctx.exception))
+
+ def test_duplicate_sequence_field_rejected(self):
+ """A sequence.field listing the same column twice is invalid
+ (Java SchemaValidation rejects repeated sequence fields).
+ """
+ table = self._create_pk_table(
+ 'seq_dup', extra_options={'sequence.field': 'ts,ts'})
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
+ with self.assertRaises(ValueError) as ctx:
+ table.new_read_builder().new_read()
+ self.assertIn('ts', str(ctx.exception))
+
+ def test_empty_segment_sequence_field_rejected(self):
+ """A malformed ``sequence.field`` with an empty segment (e.g.
+ ``'ts,,ts2'``) leaves an empty field name after trimming -- matching
+ Java ``CoreOptions.sequenceField()``, which trims but does not drop
+ empty segments -- and must be rejected by validation rather than
+ silently accepted as ``['ts', 'ts2']``.
+ """
+ table = self._create_pk_table(
+ 'seq_empty_seg', extra_options={'sequence.field': 'ts,,ts2'})
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
+ with self.assertRaises(ValueError) as ctx:
+ table.new_read_builder().new_read()
+ # The empty field name is the one that can't be found in the schema.
+ self.assertIn('can not be found', str(ctx.exception))
+
+ def test_cross_partition_update_with_sequence_field_rejected(self):
+ """sequence.field is invalid under cross-partition update (the PK
+ does not include all partition fields), matching Java
+ SchemaValidation.
+ """
+ table = self._create_pk_table(
+ 'seq_xpart', extra_options={'sequence.field': 'ts'},
+ partition_keys=['ts2'])
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
+ with self.assertRaises(ValueError) as ctx:
+ table.new_read_builder().new_read()
+ self.assertIn('cross partition', str(ctx.exception).lower())
+
+ def test_aggregate_function_on_sequence_field_rejected(self):
+ """Defining an aggregator on the sequence column is invalid: Java
+ rejects fields.<seq>.aggregate-function outright in
+ SchemaValidation.validateSequenceField. The read-builder guard
+ must reject it rather than silently override the user's
+ aggregator with last_value.
+ """
+ table = self._create_pk_table(
+ 'seq_agg_on_seq', merge_engine='aggregation',
+ extra_options={'sequence.field': 'ts',
+ 'fields.ts.aggregate-function': 'sum'})
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
+ with self.assertRaises(ValueError) as ctx:
+ table.new_read_builder().new_read()
+ self.assertIn('fields.ts.aggregate-function', str(ctx.exception))
+
+ def test_sequence_group_still_rejected(self):
+ """Top-level sequence.field is supported, but per-field
+ sequence-group is not -- the read must still raise.
+ """
+ table = self._create_pk_table(
+ 'seq_group', merge_engine='partial-update',
+ extra_options={'sequence.field': 'ts',
+ 'fields.ts2.sequence-group': 'val'})
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
+ with self.assertRaises(NotImplementedError):
+ self._read(table)
+
+ def test_nested_sequence_field_rejected(self):
+ """nested-sequence-field is unimplemented and must be rejected
+ rather than silently ignored by the top-level comparator.
+ """
+ table = self._create_pk_table(
+ 'seq_nested', merge_engine='deduplicate',
+ extra_options={'sequence.field': 'ts',
+ 'fields.val.nested-sequence-field': 'ts2'})
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
+ with self.assertRaises(NotImplementedError):
+ self._read(table)
+
+ def test_trailing_comma_sequence_field_tolerated(self):
+ """A trailing comma (``'ts,'``) must be tolerated, matching Java
+ ``String.split(',')`` which drops trailing empty segments. It
+ behaves exactly like ``'ts'`` -- not rejected as an empty field.
+ """
+ table = self._create_pk_table(
+ 'seq_trailing', extra_options={'sequence.field': 'ts,'})
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}])
+ self._write(table, [{'id': 1, 'ts': 50, 'ts2': 0, 'val': 'low'}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'high'}],
+ )
+
+ def test_complex_type_sequence_field_rejected(self):
+ """A complex (non-atomic) sequence field is valid in Java (handled
+ via RecordComparator) but unimplemented in pypaimon's atomic-only
+ comparator. It must be rejected with a clear NotImplementedError
+ rather than failing later with an obscure attribute error.
+ """
+ pa_schema = pa.schema([
+ pa.field('id', pa.int64(), nullable=False),
+ ('seq', pa.list_(pa.int64())),
+ ('val', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema, primary_keys=['id'],
+ options={'bucket': '1', 'merge-engine': 'deduplicate',
+ 'sequence.field': 'seq'})
+ self.catalog.create_table('default.seq_complex', schema, False)
+ table = self.catalog.get_table('default.seq_complex')
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ c = wb.new_commit()
+ try:
+ w.write_arrow(pa.Table.from_pylist(
+ [{'id': 1, 'seq': [1, 2], 'val': 'x'}], schema=pa_schema))
+ c.commit(w.prepare_commit())
+ finally:
+ w.close()
+ c.close()
+ with self.assertRaises(NotImplementedError) as ctx:
+ table.new_read_builder().new_read()
+ self.assertIn('seq', str(ctx.exception))
+
+
+class SequenceFieldComparabilityUnitTest(unittest.TestCase):
+ """Unit-level coverage of ``is_comparable_seq_field`` -- the predicate
+ behind the read-builder guard. VARIANT in particular is an
+ ``AtomicType`` but has no ordering, so it must be rejected like the
+ complex types rather than slipping through an ``isinstance(AtomicType)``
+ check.
+ """
+
+ def test_variant_sequence_field_not_comparable(self):
+ from pypaimon.read.reader.sort_merge_reader import (
+ is_comparable_seq_field)
+ from pypaimon.schema.data_types import AtomicType, DataField
+
+ variant = DataField(0, 'seq', AtomicType('VARIANT'))
+ self.assertFalse(is_comparable_seq_field(variant))
+
+ def test_atomic_types_are_comparable(self):
+ from pypaimon.read.reader.sort_merge_reader import (
+ is_comparable_seq_field)
+ from pypaimon.schema.data_types import AtomicType, DataField
+
+ for type_str in ('BIGINT', 'INT', 'TIMESTAMP(6)', 'DECIMAL(10, 2)',
+ 'STRING', 'BIGINT NOT NULL'):
+ field = DataField(0, 'seq', AtomicType(type_str))
+ self.assertTrue(is_comparable_seq_field(field),
+ '{} should be comparable'.format(type_str))
+
+ def test_complex_types_not_comparable(self):
+ from pypaimon.read.reader.sort_merge_reader import (
+ is_comparable_seq_field)
+ from pypaimon.schema.data_types import (
+ ArrayType, AtomicType, DataField)
+
+ array = DataField(0, 'seq', ArrayType(True, AtomicType('INT')))
+ self.assertFalse(is_comparable_seq_field(array))
+
+
+class SequenceFieldParameterizedTypeTest(unittest.TestCase):
+ """The comparability check must accept parameterized atomic types
+ (TIMESTAMP(p), DECIMAL(p, s), TIME(p)) as sequence fields -- their
+ type string carries ``(...)`` which must not be mistaken for a
+ non-comparable type.
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.catalog = CatalogFactory.create(
+ {'warehouse': os.path.join(cls.tempdir, 'warehouse')})
+ cls.catalog.create_database('default', True)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def _run(self, table_name, pa_schema, rows_first, rows_second, expected):
+ schema = Schema.from_pyarrow_schema(
+ pa_schema, primary_keys=['id'],
+ options={'bucket': '1', 'merge-engine': 'deduplicate',
+ 'sequence.field': 'seq'})
+ full = 'default.{}'.format(table_name)
+ self.catalog.create_table(full, schema, False)
+ table = self.catalog.get_table(full)
+ for batch in (rows_first, rows_second):
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ c = wb.new_commit()
+ try:
+ w.write_arrow(pa.Table.from_pylist(batch, schema=pa_schema))
+ c.commit(w.prepare_commit())
+ finally:
+ w.close()
+ c.close()
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ rows = rb.new_read().to_arrow(splits).to_pylist()
+ self.assertEqual(rows, expected)
+
+ def test_timestamp_sequence_field(self):
+ import datetime
+ pa_schema = pa.schema([
+ pa.field('id', pa.int64(), nullable=False),
+ ('seq', pa.timestamp('us')),
+ ('val', pa.string()),
+ ])
+ hi = datetime.datetime(2020, 1, 2)
+ lo = datetime.datetime(2020, 1, 1)
+ # Later write has the lower timestamp -> earlier (higher-ts) wins.
+ self._run('seq_ts',
+ pa_schema,
+ [{'id': 1, 'seq': hi, 'val': 'high'}],
+ [{'id': 1, 'seq': lo, 'val': 'low'}],
+ [{'id': 1, 'seq': hi, 'val': 'high'}])
+
+ def test_decimal_sequence_field(self):
+ from decimal import Decimal
+ pa_schema = pa.schema([
+ pa.field('id', pa.int64(), nullable=False),
+ ('seq', pa.decimal128(10, 2)),
+ ('val', pa.string()),
+ ])
+ self._run('seq_dec',
+ pa_schema,
+ [{'id': 1, 'seq': Decimal('100.50'), 'val': 'high'}],
+ [{'id': 1, 'seq': Decimal('50.25'), 'val': 'low'}],
+ [{'id': 1, 'seq': Decimal('100.50'), 'val': 'high'}])
+
+
+if __name__ == '__main__':
+ unittest.main()