This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fbab26b109 [python] In-memory merge buffer for primary-key writer
(#7759)
fbab26b109 is described below
commit fbab26b1090192d4e467855e90b85ffc576da03a
Author: chaoyang <[email protected]>
AuthorDate: Thu Jun 4 13:50:51 2026 +0800
[python] In-memory merge buffer for primary-key writer (#7759)
---
.../pypaimon/common/merge_engine_dispatch.py | 167 ++++++++++
...e_function.py => deduplicate_merge_function.py} | 40 +--
.../read/reader/first_row_merge_function.py | 7 +-
.../read/reader/partial_update_merge_function.py | 83 +++--
.../pypaimon/read/reader/sort_merge_reader.py | 18 +-
paimon-python/pypaimon/read/split_read.py | 53 ++-
paimon-python/pypaimon/table/row/key_value.py | 14 +
.../pypaimon/tests/reader_primary_key_test.py | 13 +-
paimon-python/pypaimon/tests/test_first_row_e2e.py | 46 +++
.../tests/test_first_row_merge_function.py | 16 +
.../pypaimon/tests/test_merge_engine_dispatch.py | 135 ++++++++
.../pypaimon/tests/test_partial_update_e2e.py | 137 +++++++-
.../tests/test_partial_update_merge_function.py | 44 ++-
.../pypaimon/tests/test_sequence_field_read.py | 7 +-
.../pypaimon/tests/test_write_merge_buffer.py | 366 +++++++++++++++++++++
paimon-python/pypaimon/write/file_store_write.py | 112 ++++++-
.../pypaimon/write/writer/key_value_data_writer.py | 223 ++++++++++++-
17 files changed, 1337 insertions(+), 144 deletions(-)
diff --git a/paimon-python/pypaimon/common/merge_engine_dispatch.py
b/paimon-python/pypaimon/common/merge_engine_dispatch.py
new file mode 100644
index 0000000000..a9096fd492
--- /dev/null
+++ b/paimon-python/pypaimon/common/merge_engine_dispatch.py
@@ -0,0 +1,167 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Centralised merge-engine dispatch.
+
+Both the read path (``MergeFileSplitRead``) and the write path
+(``KeyValueDataWriter``'s in-memory merge buffer) need to pick a
+``MergeFunction`` based on the table's ``merge-engine`` option. This
+module is the single source of truth so the two sides cannot drift.
+"""
+
+from typing import List, Optional
+
+from pypaimon.common.options.core_options import MergeEngine
+from pypaimon.read.reader.deduplicate_merge_function import \
+ DeduplicateMergeFunction
+from pypaimon.read.reader.first_row_merge_function import \
+ FirstRowMergeFunction
+from pypaimon.read.reader.partial_update_merge_function import \
+ PartialUpdateMergeFunction
+
+
+# Boolean-valued options that, when truthy, opt the table into
+# behaviour the pypaimon PartialUpdateMergeFunction does not yet
+# implement. Setting any of these forces the dispatch to refuse the
+# write instead of running the simple last-non-null merge silently.
+_PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS = (
+ "ignore-delete",
+ "partial-update.ignore-delete",
+ "first-row.ignore-delete",
+ "deduplicate.ignore-delete",
+ "partial-update.remove-record-on-delete",
+ "partial-update.remove-record-on-sequence-group",
+)
+_FIELDS_PREFIX = "fields."
+_FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group"
+_FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function"
+_DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function"
+
+# Mirror ``CoreOptions.ignore_delete()``: any of these keys, if set to
+# ``"true"``, opts the engine into silently dropping
+# DELETE/UPDATE_BEFORE records. Kept as a raw-option lookup here so the
+# dispatch stays table-agnostic.
+_IGNORE_DELETE_KEYS = (
+ "ignore-delete",
+ "first-row.ignore-delete",
+ "deduplicate.ignore-delete",
+ "partial-update.ignore-delete",
+)
+
+
+def build_merge_function(
+ *,
+ engine: MergeEngine,
+ raw_options: dict,
+ key_arity: int,
+ value_arity: int,
+ value_field_nullables: List[bool],
+ value_field_names: Optional[List[str]] = None,
+):
+ """Pick the MergeFunction for the table's ``merge-engine`` option.
+
+ ``engine`` and ``raw_options`` come from the table's ``CoreOptions``
+ (typically ``table.options.merge_engine()`` and
+ ``table.options.options.to_map()``). ``key_arity`` / ``value_arity``
+ / ``value_field_nullables`` describe the value-side schema the
+ caller wants the merge function to operate on -- for the read path
+ this is the projected read schema, for the write path it's the full
+ table schema (minus primary keys).
+
+ ``value_field_names`` is optional and only used by
+ ``PartialUpdateMergeFunction`` to surface the offending field name
+ when a NOT NULL constraint is violated; pass ``None`` if the caller
+ doesn't have names handy.
+ """
+ if engine == MergeEngine.DEDUPLICATE:
+ return DeduplicateMergeFunction()
+ if engine == MergeEngine.PARTIAL_UPDATE:
+ unsupported = partial_update_unsupported_options(raw_options)
+ if unsupported:
+ raise NotImplementedError(
+ "merge-engine 'partial-update' is enabled together with "
+ "options that pypaimon does not yet implement: {}. The "
+ "supported subset is per-key last-non-null merge with "
+ "no sequence-group, no per-field aggregator override, "
+ "no ignore-delete and no partial-update.remove-record-on-* "
+ "flags. Open an issue to track Python support.".format(
+ ", ".join(sorted(unsupported))
+ )
+ )
+ return PartialUpdateMergeFunction(
+ key_arity=key_arity,
+ value_arity=value_arity,
+ nullables=list(value_field_nullables),
+ value_field_names=(
+ list(value_field_names)
+ if value_field_names is not None else None),
+ )
+ if engine == MergeEngine.FIRST_ROW:
+ return FirstRowMergeFunction(
+ ignore_delete=_ignore_delete_from_options(raw_options),
+ )
+ raise NotImplementedError(
+ "merge-engine '{}' is not implemented in pypaimon yet "
+ "(supported: deduplicate, first-row, partial-update). Open an "
+ "issue to track support.".format(engine.value)
+ )
+
+
+def _ignore_delete_from_options(raw_options: dict) -> bool:
+ for key in _IGNORE_DELETE_KEYS:
+ val = raw_options.get(key)
+ if val is not None:
+ return _option_is_truthy(val)
+ return False
+
+
+def partial_update_unsupported_options(raw_options: dict):
+ """Return the set of option keys this table sets that
+ ``PartialUpdateMergeFunction`` does not yet support. Empty set
+ means we can safely run the simple last-non-null merge.
+ """
+ flagged = set()
+ for key, value in raw_options.items():
+ if (key in _PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS
+ and _option_is_truthy(value)):
+ flagged.add(key)
+ elif key == _DEFAULT_AGGREGATE_FUNCTION_KEY:
+ flagged.add(key)
+ elif key.startswith(_FIELDS_PREFIX) and (
+ key.endswith(_FIELD_SEQUENCE_GROUP_SUFFIX)
+ or key.endswith(_FIELD_AGGREGATE_FUNCTION_SUFFIX)):
+ flagged.add(key)
+ return flagged
+
+
+def _option_is_truthy(raw):
+ """Strict ``"true"`` boolean parsing for table-option strings.
+
+ A string is truthy iff it equals ``"true"`` (case-insensitive).
+ ``"yes"``, ``"on"``, ``"1"`` and similar Python-truthy strings are
+ treated as falsey, matching the table-option parser used elsewhere
+ in Paimon so an option string the rest of the toolchain treats as
+ ``false`` is not silently elevated to ``true`` here.
+ """
+ if raw is None:
+ return False
+ if isinstance(raw, bool):
+ return raw
+ if isinstance(raw, str):
+ return raw.strip().lower() == "true"
+ return False
diff --git a/paimon-python/pypaimon/read/reader/first_row_merge_function.py
b/paimon-python/pypaimon/read/reader/deduplicate_merge_function.py
similarity index 60%
copy from paimon-python/pypaimon/read/reader/first_row_merge_function.py
copy to paimon-python/pypaimon/read/reader/deduplicate_merge_function.py
index 162d608a64..5b669aae55 100644
--- a/paimon-python/pypaimon/read/reader/first_row_merge_function.py
+++ b/paimon-python/pypaimon/read/reader/deduplicate_merge_function.py
@@ -16,35 +16,35 @@
# limitations under the License.
################################################################################
+"""Default merge function for primary-key tables.
+
+Mirrors Java ``DeduplicateMergeFunction`` -- for a run of KVs sharing
+the same primary key, keep only the one with the highest sequence
+number (by virtue of ``add`` being called in sequence-number order).
+"""
+
from typing import Optional
from pypaimon.table.row.key_value import KeyValue
-class FirstRowMergeFunction:
- """A MergeFunction where key is primary key (unique) and value is the
- full record, only keep the first one."""
+class DeduplicateMergeFunction:
+ """Keep only the latest KV per primary key.
+
+ Used by both the read path (``SortMergeReaderWithMinHeap``) and the
+ write path (``KeyValueDataWriter`` in-memory merge buffer) -- the
+ latter is what enforces the LSM "PK unique within a file"
+ invariant on flush.
+ """
- def __init__(self, ignore_delete: bool = False):
- self.ignore_delete = ignore_delete
- self.first: Optional[KeyValue] = None
+ def __init__(self):
+ self.latest_kv: Optional[KeyValue] = None
def reset(self) -> None:
- self.first = None
+ self.latest_kv = None
def add(self, kv: KeyValue) -> None:
- if not kv.is_add():
- if self.ignore_delete:
- return
- raise ValueError(
- "By default, First row merge engine can not accept "
- "DELETE/UPDATE_BEFORE records.\n"
- "You can config 'ignore-delete' to ignore the "
- "DELETE/UPDATE_BEFORE records."
- )
-
- if self.first is None:
- self.first = kv
+ self.latest_kv = kv
def get_result(self) -> Optional[KeyValue]:
- return self.first
+ return self.latest_kv
diff --git a/paimon-python/pypaimon/read/reader/first_row_merge_function.py
b/paimon-python/pypaimon/read/reader/first_row_merge_function.py
index 162d608a64..f3a91aadcf 100644
--- a/paimon-python/pypaimon/read/reader/first_row_merge_function.py
+++ b/paimon-python/pypaimon/read/reader/first_row_merge_function.py
@@ -44,7 +44,12 @@ class FirstRowMergeFunction:
)
if self.first is None:
- self.first = kv
+ # Snapshot, don't keep the reference: the caller may pool/reuse
+ # a single KeyValue and replace() it for the next row (the write
+ # path's fold does exactly this). Holding the live reference
+ # would make get_result return the LAST row instead of the
+ # first, silently turning first-row into last-row.
+ self.first = kv.copy()
def get_result(self) -> Optional[KeyValue]:
return self.first
diff --git
a/paimon-python/pypaimon/read/reader/partial_update_merge_function.py
b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py
index 978b48011c..6fc2d45311 100644
--- a/paimon-python/pypaimon/read/reader/partial_update_merge_function.py
+++ b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py
@@ -16,26 +16,22 @@
# limitations under the License.
################################################################################
-"""
-Python port of Java's ``PartialUpdateMergeFunction``
-(``paimon-core/src/main/java/org/apache/paimon/mergetree/compact/
-PartialUpdateMergeFunction.java``).
+"""Merge function for the ``partial-update`` merge engine on PK tables.
-The merge function used by the ``partial-update`` merge engine on PK
-tables: rows sharing a primary key are merged left-to-right, taking the
-latest non-null value per non-PK field. ``DeduplicateMergeFunction``
-keeps only the latest row; ``PartialUpdateMergeFunction`` instead lets
-later writes "fill in" fields the earlier writes left null, so users
-can write the same logical record across multiple commits with
-different sets of non-null columns.
+Rows sharing a primary key are merged left-to-right, taking the latest
+non-null value per non-PK field. ``DeduplicateMergeFunction`` keeps
+only the latest row; ``PartialUpdateMergeFunction`` instead lets later
+writes "fill in" fields the earlier writes left null, so users can
+write the same logical record across multiple commits with different
+sets of non-null columns.
-This is the **core merge semantics only**. The Java implementation also
+This is the **core merge semantics only**. The upstream engine also
supports per-field aggregator overrides (``fields.<name>.aggregate-
function``), sequence groups (``fields.<name>.sequence-group``),
``ignore-delete``, and ``partial-update.remove-record-on-*`` options.
-None of those are implemented yet; non-INSERT row kinds raise
-``NotImplementedError`` at ``add`` time so we never silently corrupt
-data with a half-implemented contract.
+None of those are implemented in pypaimon yet; non-INSERT row kinds
+raise ``NotImplementedError`` at ``add`` time so we never silently
+corrupt data with a half-implemented contract.
"""
from typing import Any, List, Optional
@@ -56,19 +52,31 @@ class PartialUpdateMergeFunction:
"""
def __init__(self, key_arity: int, value_arity: int,
- nullables: Optional[List[bool]] = None):
+ nullables: Optional[List[bool]] = None,
+ value_field_names: Optional[List[str]] = None):
self._key_arity = key_arity
self._value_arity = value_arity
# Per-value-field nullable flags, parallel to value indices. When
# ``None``, no nullability check runs (preserves the contract for
# direct callers that don't have schema info handy). When given,
- # mirrors Java's ``updateNonNullFields`` check: a null input on a
- # NOT NULL field raises rather than being silently absorbed.
+ # the schema's NOT NULL declaration is enforced on every add():
+ # a null input on a NOT NULL field raises rather than being
+ # silently absorbed.
if nullables is not None and len(nullables) != value_arity:
raise ValueError(
"nullables length {} does not match value_arity {}".format(
len(nullables), value_arity))
self._nullables = nullables
+ # Optional value-field names, parallel to value indices. When
+ # given, the NOT-NULL error message uses the field name instead
+ # of a bare position to make the failure actionable.
+ if value_field_names is not None \
+ and len(value_field_names) != value_arity:
+ raise ValueError(
+ "value_field_names length {} does not match "
+ "value_arity {}".format(
+ len(value_field_names), value_arity))
+ self._value_field_names = value_field_names
# Lazily allocated on first add(); ``None`` means "no rows yet".
self._accumulator: Optional[List[Any]] = None
# Reference to the most recently added kv. We use it only to
@@ -84,23 +92,24 @@ class PartialUpdateMergeFunction:
def add(self, kv: KeyValue) -> None:
row_kind_byte = kv.value_row_kind_byte
if not RowKind.is_add_byte(row_kind_byte):
- # DELETE / UPDATE_BEFORE need ignore-delete or
- # partial-update.remove-record-on-delete to be set in Java;
- # neither option is wired up in pypaimon yet, so refuse the
- # row rather than silently swallow it.
+ # DELETE / UPDATE_BEFORE require ignore-delete or
+ # partial-update.remove-record-on-delete to be enabled,
+ # and neither option is implemented in pypaimon yet, so
+ # refuse the row rather than silently swallow it.
raise NotImplementedError(
- "PartialUpdateMergeFunction received a {} row; this "
- "Python port does not yet implement the ignore-delete / "
- "partial-update.remove-record-on-delete options. Use the "
- "Java client for tables that produce DELETE / "
- "UPDATE_BEFORE
rows.".format(RowKind(row_kind_byte).to_string())
+ "PartialUpdateMergeFunction received a {} row; the "
+ "ignore-delete / partial-update.remove-record-on-delete "
+ "options needed to handle it are not yet implemented in "
+ "pypaimon. Tables that produce DELETE / UPDATE_BEFORE "
+ "rows are not supported here.".format(
+ RowKind(row_kind_byte).to_string())
)
- # Mirror Java's reset() + updateNonNullFields(): the accumulator
- # starts as all-null (equivalent to ``new GenericRow(arity)``) and
- # each add() writes non-null inputs; null inputs are absorbed —
- # except when the schema marks the field NOT NULL, in which case
- # we raise to match Java's IllegalArgumentException check.
+ # The accumulator starts as all-null and each add() writes
+ # non-null inputs; null inputs are absorbed -- except when the
+ # schema marks the field NOT NULL, in which case we raise so
+ # the violation surfaces at write time instead of producing a
+ # row that breaks the schema invariant.
if self._accumulator is None:
self._accumulator = [None] * self._value_arity
for i in range(self._value_arity):
@@ -108,7 +117,15 @@ class PartialUpdateMergeFunction:
if v is not None:
self._accumulator[i] = v
elif self._nullables is not None and not self._nullables[i]:
- raise ValueError("Field {} can not be null".format(i))
+ if self._value_field_names is not None:
+ field_ref = "'{}'".format(self._value_field_names[i])
+ else:
+ field_ref = "at index {}".format(i)
+ raise ValueError(
+ "Partial-update received NULL for non-nullable field "
+ "{}. Declare the field nullable in the table schema "
+ "if writes can leave it unset, or supply a value."
+ .format(field_ref))
self._latest_kv = kv
def get_result(self) -> Optional[KeyValue]:
diff --git a/paimon-python/pypaimon/read/reader/sort_merge_reader.py
b/paimon-python/pypaimon/read/reader/sort_merge_reader.py
index 1597745bbd..c525a7592c 100644
--- a/paimon-python/pypaimon/read/reader/sort_merge_reader.py
+++ b/paimon-python/pypaimon/read/reader/sort_merge_reader.py
@@ -18,6 +18,8 @@
import heapq
from typing import Any, Callable, List, Optional
+from pypaimon.read.reader.deduplicate_merge_function import \
+ DeduplicateMergeFunction
from pypaimon.read.reader.iface.record_iterator import RecordIterator
from pypaimon.read.reader.iface.record_reader import RecordReader
from pypaimon.schema.data_types import AtomicType, DataField, Keyword
@@ -140,22 +142,6 @@ class SortMergeIterator(RecordIterator):
return True
-class DeduplicateMergeFunction:
- """A MergeFunction where key is primary key (unique) and value is the full
record, only keep the latest one."""
-
- def __init__(self):
- self.latest_kv = None
-
- def reset(self) -> None:
- self.latest_kv = None
-
- def add(self, kv: KeyValue):
- self.latest_kv = kv
-
- def get_result(self) -> Optional[KeyValue]:
- return self.latest_kv
-
-
class Element:
def __init__(self, kv: KeyValue, iterator: RecordIterator[KeyValue],
reader: RecordReader[KeyValue]):
self.kv = kv
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 7d20359dfc..5a78cb07be 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -20,6 +20,7 @@ from abc import ABC, abstractmethod
from functools import partial
from typing import Callable, Dict, List, Optional, Tuple
+from pypaimon.common.merge_engine_dispatch import build_merge_function
from pypaimon.common.options.core_options import CoreOptions, MergeEngine
from pypaimon.common.predicate import Predicate
from pypaimon.deletionvectors import ApplyDeletionVectorReader
@@ -59,12 +60,7 @@ from pypaimon.read.reader.key_value_wrap_reader import
KeyValueWrapReader
from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
from pypaimon.read.reader.aggregation_merge_function import (
AggregateMergeFunction, build_field_aggregators)
-from pypaimon.read.reader.partial_update_merge_function import \
- PartialUpdateMergeFunction
-from pypaimon.read.reader.first_row_merge_function import \
- FirstRowMergeFunction
-from pypaimon.read.reader.sort_merge_reader import (DeduplicateMergeFunction,
- SortMergeReaderWithMinHeap,
+from pypaimon.read.reader.sort_merge_reader import (SortMergeReaderWithMinHeap,
builtin_seq_comparator)
from pypaimon.read.push_down_utils import _get_all_fields
from pypaimon.read.split import Split
@@ -125,7 +121,7 @@ class SplitRead(ABC):
self.limit = limit
# Snapshot the raw value-side schema before _create_key_value_fields
# wraps it, so MergeFileSplitRead can hand per-value-field nullable
- # flags to merge functions that mirror Java's NOT-NULL check.
+ # flags to merge functions that enforce NOT-NULL on every add().
self.value_fields = list(read_type)
self.trimmed_primary_key = self.table.trimmed_primary_keys
@@ -703,28 +699,20 @@ class MergeFileSplitRead(SplitRead):
seq_comparator=self.seq_comparator)
def _build_merge_function(self):
- """Pick the right MergeFunction implementation for the table's
- ``merge-engine`` option.
-
- The pre-flight checks that reject unsupported engines or option
- combinations live in
- :func:`pypaimon.read.merge_engine_support.check_supported` and
- run at ``TableRead.__init__`` time, so by the point this method
- executes only the supported engines are reachable.
+ """Pick the MergeFunction for the table's ``merge-engine`` option.
+
+ Delegates to the shared dispatch in
+ ``pypaimon.common.merge_engine_dispatch`` so the read path and
+ the in-memory merge buffer on the write path cannot drift.
+ ``AGGREGATE`` is special-cased here because building the per-
+ field aggregators needs the full ``DataField`` objects, the
+ full primary-key list and the parsed ``CoreOptions`` -- which
+ sit outside the dispatch's raw-options contract. The writer-
+ side merge buffer falls back to dedupe for aggregation anyway
+ (see :meth:`FileStoreWrite._build_pk_merge_function`), so the
+ two sides only need to share the simple engines.
"""
engine = self.table.options.merge_engine()
- if engine == MergeEngine.DEDUPLICATE:
- return DeduplicateMergeFunction()
- if engine == MergeEngine.PARTIAL_UPDATE:
- return PartialUpdateMergeFunction(
- key_arity=len(self.trimmed_primary_key),
- value_arity=self.value_arity,
- nullables=[f.type.nullable for f in self.value_fields],
- )
- if engine == MergeEngine.FIRST_ROW:
- return FirstRowMergeFunction(
- ignore_delete=self.table.options.ignore_delete(),
- )
if engine == MergeEngine.AGGREGATE:
# Use the full primary-key list, not ``trimmed_primary_key``:
# ``value_fields`` still carries partition columns, so any PK
@@ -742,10 +730,13 @@ class MergeFileSplitRead(SplitRead):
value_arity=self.value_arity,
field_aggregators=field_aggregators,
)
- # check_supported() rejects everything else at TableRead.__init__.
- raise AssertionError(
- "unreachable: merge-engine '{}' should have been rejected by "
- "merge_engine_support.check_supported".format(engine.value)
+ return build_merge_function(
+ engine=engine,
+ raw_options=self.table.options.options.to_map(),
+ key_arity=len(self.trimmed_primary_key),
+ value_arity=self.value_arity,
+ value_field_nullables=[f.type.nullable for f in self.value_fields],
+ value_field_names=[f.name for f in self.value_fields],
)
def create_reader(self) -> RecordReader:
diff --git a/paimon-python/pypaimon/table/row/key_value.py
b/paimon-python/pypaimon/table/row/key_value.py
index 845f77fadc..8fc89ea8b8 100644
--- a/paimon-python/pypaimon/table/row/key_value.py
+++ b/paimon-python/pypaimon/table/row/key_value.py
@@ -36,6 +36,20 @@ class KeyValue:
self._reused_value.replace(row_tuple)
return self
+ def copy(self) -> 'KeyValue':
+ """Return an independent KeyValue carrying the current row tuple.
+
+ ``replace`` swaps the tuple reference rather than mutating it, so a
+ copy stays valid even if a pooled/reused source KeyValue is later
+ replaced again. Callers that need to hold onto a kv past the next
+ ``replace`` (e.g. FirstRowMergeFunction keeping the first row while
+ the writer folds with a single pooled KeyValue) use this.
+ """
+ new = KeyValue(self.key_arity, self.value_arity)
+ if self._row_tuple is not None:
+ new.replace(self._row_tuple)
+ return new
+
def is_add(self) -> bool:
return RowKind.is_add_byte(self.value_row_kind_byte)
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index 89a9e8a6a1..7cae0a77c7 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -224,12 +224,15 @@ class PkReaderTest(unittest.TestCase):
read_builder = table.new_read_builder()
actual = self._read_test_table(read_builder).sort_by('user_id')
- # TODO support pk merge feature when multiple write
+ # The in-memory merge buffer in KeyValueDataWriter folds the
+ # two writes for user_id=2 down to the latest row before flush
+ # (default merge engine is deduplicate), so the PK appears once
+ # with the second batch's value.
expected = pa.Table.from_pydict({
- 'user_id': [1, 2, 2, 3, 4, 5, 7, 8],
- 'item_id': [1001, 1002, 1002, 1003, 1004, 1005, 1007, 1008],
- 'behavior': ['a', 'b', 'b-new', 'c', None, 'e', 'g', 'h'],
- 'dt': ['p1', 'p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2'],
+ 'user_id': [1, 2, 3, 4, 5, 7, 8],
+ 'item_id': [1001, 1002, 1003, 1004, 1005, 1007, 1008],
+ 'behavior': ['a', 'b-new', 'c', None, 'e', 'g', 'h'],
+ 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2'],
}, schema=self.pa_schema)
self.assertEqual(actual, expected)
diff --git a/paimon-python/pypaimon/tests/test_first_row_e2e.py
b/paimon-python/pypaimon/tests/test_first_row_e2e.py
index 17a13c9aae..58776848b8 100644
--- a/paimon-python/pypaimon/tests/test_first_row_e2e.py
+++ b/paimon-python/pypaimon/tests/test_first_row_e2e.py
@@ -149,6 +149,52 @@ class FirstRowMergeEngineE2ETest(unittest.TestCase):
],
)
+ def test_first_row_intra_batch_duplicate(self):
+ """A single write whose batch already contains duplicate PKs.
+
+ The whole batch is folded in one flush, so this exercises the
+ write-side fold rather than the cross-commit read merge. first-row
+ must keep the first occurrence of each PK.
+ """
+ table = self._create_pk_table('first_row_intra_batch')
+ self._write(table, [
+ {'id': 1, 'a': 'first', 'b': 'B1'},
+ {'id': 1, 'a': 'second', 'b': 'B2'},
+ {'id': 1, 'a': 'third', 'b': 'B3'},
+ {'id': 2, 'a': 'only', 'b': 'B'},
+ ])
+
+ self.assertEqual(
+ self._read(table),
+ [
+ {'id': 1, 'a': 'first', 'b': 'B1'},
+ {'id': 2, 'a': 'only', 'b': 'B'},
+ ],
+ )
+
+ def test_first_row_multiple_writes_one_commit(self):
+ """Several write_arrow calls committed once: the same PK across
+ those writes folds in a single flush. first-row keeps the first.
+ """
+ table = self._create_pk_table('first_row_multi_write_one_commit')
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ c = wb.new_commit()
+ try:
+ w.write_arrow(pa.Table.from_pylist(
+ [{'id': 1, 'a': 'first', 'b': 'B1'}], schema=self.pa_schema))
+ w.write_arrow(pa.Table.from_pylist(
+ [{'id': 1, 'a': 'second', 'b': 'B2'}], schema=self.pa_schema))
+ c.commit(w.prepare_commit())
+ finally:
+ w.close()
+ c.close()
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'a': 'first', 'b': 'B1'}],
+ )
+
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_first_row_merge_function.py
b/paimon-python/pypaimon/tests/test_first_row_merge_function.py
index 7f411f85ff..5e56561984 100644
--- a/paimon-python/pypaimon/tests/test_first_row_merge_function.py
+++ b/paimon-python/pypaimon/tests/test_first_row_merge_function.py
@@ -64,6 +64,22 @@ class FirstRowMergeFunctionTest(unittest.TestCase):
result = mf.get_result()
self.assertEqual(_result_value(result), (10, "first"))
+ def test_keeps_first_row_when_kv_is_pooled(self):
+ # The writer's fold (KeyValueDataWriter._merge_pending_by_pk) reuses
+ # a single KeyValue and replace()s it per row. add() must snapshot
+ # the first row; otherwise get_result tracks the pooled kv's last
+ # replace() and returns the LAST row -- silently turning first-row
+ # into last-row. This is the case the per-row _kv() tests miss.
+ mf = FirstRowMergeFunction()
+ mf.reset()
+ pooled = KeyValue(key_arity=1, value_arity=2)
+ pooled.replace((1, 1, RowKind.INSERT.value, 10, "first"))
+ mf.add(pooled)
+ pooled.replace((1, 2, RowKind.INSERT.value, 20, "second"))
+ mf.add(pooled)
+ result = mf.get_result()
+ self.assertEqual(_result_value(result), (10, "first"))
+
def test_reset_clears_state(self):
mf = FirstRowMergeFunction()
mf.reset()
diff --git a/paimon-python/pypaimon/tests/test_merge_engine_dispatch.py
b/paimon-python/pypaimon/tests/test_merge_engine_dispatch.py
new file mode 100644
index 0000000000..f8e2d717a5
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_merge_engine_dispatch.py
@@ -0,0 +1,135 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Unit tests for ``pypaimon.common.merge_engine_dispatch``.
+
+Pins down the table-option parsing the dispatch uses to decide whether
+``partial-update`` should run or be rejected. The key contract: strict
+``"true"``-only boolean parsing aligned with the table-option parser
+used elsewhere in Paimon, so an option string the rest of the
+toolchain treats as ``false`` is not silently elevated to ``true``
+here.
+"""
+
+import unittest
+
+from pypaimon.common.merge_engine_dispatch import (
+ _option_is_truthy,
+ build_merge_function,
+ partial_update_unsupported_options,
+)
+from pypaimon.common.options.core_options import MergeEngine
+from pypaimon.read.reader.partial_update_merge_function import \
+ PartialUpdateMergeFunction
+
+
+class OptionIsTruthyTest(unittest.TestCase):
+ """``_option_is_truthy`` accepts only ``"true"`` (case-insensitive)
+ as truthy; every other string -- including ``"1"``, ``"yes"``,
+ ``"on"`` -- is falsey. Matches the table-option parser used
+ elsewhere in Paimon.
+ """
+
+ def test_true_string_is_truthy(self):
+ self.assertTrue(_option_is_truthy("true"))
+
+ def test_true_string_is_case_insensitive(self):
+ for v in ("TRUE", "True", "tRuE"):
+ self.assertTrue(_option_is_truthy(v), v)
+
+ def test_true_string_tolerates_surrounding_whitespace(self):
+ self.assertTrue(_option_is_truthy(" true "))
+
+ def test_python_bool_true_is_truthy(self):
+ self.assertTrue(_option_is_truthy(True))
+
+ def test_python_bool_false_is_falsey(self):
+ self.assertFalse(_option_is_truthy(False))
+
+ def test_none_is_falsey(self):
+ self.assertFalse(_option_is_truthy(None))
+
+ def test_non_true_strings_are_falsey(self):
+ # The table-option parser elsewhere in Paimon returns false
+ # for every one of these. pypaimon must do the same so a
+ # user-set "yes" is not silently elevated to true here while
+ # the rest of the toolchain treats it as false.
+ for v in ("1", "yes", "on", "Yes", "ON", "y", "t", "0", "no", "off",
+ "false", "FALSE", ""):
+ self.assertFalse(_option_is_truthy(v), v)
+
+
+class PartialUpdateUnsupportedOptionsTest(unittest.TestCase):
+
+ def test_ignore_delete_yes_is_not_flagged(self):
+ # ``yes`` is falsey under the upstream table-option parser,
+ # so partial-update must NOT be blocked here. Pre-fix pypaimon
+ # rejected this; the fix aligns the dispatch with the parser.
+ unsupported = partial_update_unsupported_options(
+ {"partial-update.ignore-delete": "yes"})
+ self.assertEqual(unsupported, set())
+
+ def test_ignore_delete_true_is_flagged(self):
+ unsupported = partial_update_unsupported_options(
+ {"partial-update.ignore-delete": "true"})
+ self.assertEqual(unsupported, {"partial-update.ignore-delete"})
+
+ def test_sequence_group_is_flagged(self):
+ unsupported = partial_update_unsupported_options(
+ {"fields.a.sequence-group": "b"})
+ self.assertEqual(unsupported, {"fields.a.sequence-group"})
+
+ def test_unrelated_options_are_not_flagged(self):
+ unsupported = partial_update_unsupported_options(
+ {"bucket": "1", "merge-engine": "partial-update"})
+ self.assertEqual(unsupported, set())
+
+
+class BuildMergeFunctionTest(unittest.TestCase):
+ """``build_merge_function`` forwards ``value_field_names`` to
+ ``PartialUpdateMergeFunction`` so the NOT-NULL error message can
+ surface the offending column name. This is the only behavioural
+ contract the dispatch adds on top of routing.
+ """
+
+ def test_partial_update_forwards_field_names(self):
+ mf = build_merge_function(
+ engine=MergeEngine.PARTIAL_UPDATE,
+ raw_options={},
+ key_arity=1,
+ value_arity=2,
+ value_field_nullables=[True, True],
+ value_field_names=['col_a', 'col_b'],
+ )
+ self.assertIsInstance(mf, PartialUpdateMergeFunction)
+ self.assertEqual(mf._value_field_names, ['col_a', 'col_b'])
+
+ def test_partial_update_without_field_names_keeps_none(self):
+ mf = build_merge_function(
+ engine=MergeEngine.PARTIAL_UPDATE,
+ raw_options={},
+ key_arity=1,
+ value_arity=2,
+ value_field_nullables=[True, True],
+ )
+ self.assertIsInstance(mf, PartialUpdateMergeFunction)
+ self.assertIsNone(mf._value_field_names)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py
b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
index 8606d6c239..90fa5f711b 100644
--- a/paimon-python/pypaimon/tests/test_partial_update_e2e.py
+++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
@@ -88,6 +88,24 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
w.close()
c.close()
+ def _write_many(self, table, batches):
+ """Multiple ``write_arrow`` calls inside a single ``prepare_commit``.
+
+ Mirrors the reviewer's question: rows that land in the same
+ underlying data file must still go through the merge-engine
+ dispatch; in-writer merging cannot silently degrade to dedupe.
+ """
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ c = wb.new_commit()
+ try:
+ for rows in batches:
+ w.write_arrow(pa.Table.from_pylist(rows,
schema=self.pa_schema))
+ c.commit(w.prepare_commit())
+ finally:
+ w.close()
+ c.close()
+
def _read(self, table):
rb = table.new_read_builder()
splits = rb.new_scan().plan().splits()
@@ -173,6 +191,46 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
[{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}],
)
+ # -- single-commit, multiple write_arrow calls -----------------------
+ #
+ # The in-memory merge buffer added to ``KeyValueDataWriter`` runs
+ # the merge function on flush, so rows from multiple ``write_arrow``
+ # calls that share a primary key are folded into a single row before
+ # the data file is written. The flushed file therefore satisfies the
+ # LSM "PK unique within a file" invariant the read-side
+ # ``raw_convertible`` fast path relies on.
+
+ def test_partial_update_two_write_arrows_single_commit(self):
+ """Two ``write_arrow`` calls + one ``prepare_commit``: each
+ carries a disjoint non-null field; result is the per-field merge.
+ """
+ table = self._create_pk_table('two_writes_single_commit')
+ self._write_many(table, [
+ [{'id': 1, 'a': 'A', 'b': None, 'c': None}],
+ [{'id': 1, 'a': None, 'b': 'B', 'c': None}],
+ ])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}],
+ )
+
+ def test_partial_update_three_write_arrows_single_commit(self):
+ """Three ``write_arrow`` calls in a single commit compose into
+ the union of non-null fields.
+ """
+ table = self._create_pk_table('three_writes_single_commit')
+ self._write_many(table, [
+ [{'id': 1, 'a': 'A', 'b': None, 'c': None}],
+ [{'id': 1, 'a': None, 'b': 'B', 'c': None}],
+ [{'id': 1, 'a': None, 'b': None, 'c': 'C'}],
+ ])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}],
+ )
+
# -- deduplicate (regression) ----------------------------------------
def test_deduplicate_engine_unchanged(self):
@@ -188,10 +246,37 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
[{'id': 1, 'a': 'new', 'b': None, 'c': None}],
)
+ def test_deduplicate_two_write_arrows_single_commit(self):
+ """Pre-PR master silently returned both rows because the
+ flushed file held two records sharing a primary key. With the
+ in-memory merge buffer in place, ``deduplicate`` collapses
+ same-PK rows in a single commit too -- LSM "PK unique within a
+ file" invariant restored.
+ """
+ table = self._create_pk_table(
+ 'dedupe_two_writes_single_commit',
+ merge_engine='deduplicate',
+ )
+ self._write_many(table, [
+ [{'id': 1, 'a': 'first', 'b': 'old', 'c': None}],
+ [{'id': 1, 'a': 'second', 'b': 'new', 'c': None}],
+ ])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'a': 'second', 'b': 'new', 'c': None}],
+ )
+
# -- other supported engines (smoke) ---------------------------------
def test_first_row_engine_keeps_first(self):
- """The ``first-row`` engine must keep the earliest row per PK."""
+ """The ``first-row`` engine must keep the earliest row per PK.
+
+ Both the writer-side merge buffer and the reader-side merge
+ function go through ``merge_engine_dispatch``, so first-row is
+ a real supported engine (no dedupe fallback / no NotImplemented
+ raise) on both sides.
+ """
table = self._create_pk_table('first_row_supported',
merge_engine='first-row')
self._write(table, [{'id': 1, 'a': 'first', 'b': None, 'c': None}])
@@ -202,6 +287,23 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
[{'id': 1, 'a': 'first', 'b': None, 'c': None}],
)
+ def test_aggregation_engine_write_logs_fallback_warning(self):
+ """The write-side fallback to deduplicate for unsupported engines
+ is silent in terms of return value -- a ``logging.warning`` is
+ the only signal that file contents will not match the table's
+ declared semantics. Important when the same table is read back
+ by a reader that honours the declared engine; the pypaimon
+ read-side raise wouldn't fire there.
+ """
+ table = self._create_pk_table('agg_warning',
+ merge_engine='aggregation')
+ with self.assertLogs(
+ 'pypaimon.write.file_store_write', level='WARNING') as cm:
+ self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}])
+ combined = '\n'.join(cm.output)
+ self.assertIn('aggregation', combined)
+ self.assertIn('deduplicate', combined)
+
# -- partial-update + out-of-scope option combinations ---------------
#
# When a user pairs ``merge-engine: partial-update`` with any option
@@ -213,15 +315,14 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
def _assert_partial_update_unsupported(self, table_name, extra_options,
expected_keys):
+ # Shared dispatch runs at write time too, so the unsupported-
+ # option error surfaces inside the first ``write_arrow`` call
+ # (when ``FileStoreWrite._create_data_writer`` first runs)
+ # rather than waiting for read.
table = self._create_pk_table(
table_name, extra_options=extra_options)
- self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}])
- self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}])
-
- rb = table.new_read_builder()
- splits = rb.new_scan().plan().splits()
with self.assertRaises(NotImplementedError) as cm:
- rb.new_read().to_arrow(splits)
+ self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}])
msg = str(cm.exception)
self.assertIn("partial-update", msg)
for key in expected_keys:
@@ -271,25 +372,25 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
)
def
test_partial_update_unsupported_options_guard_covers_raw_convertible(self):
- """The unsupported-options guard must fire even when the scan
- would dispatch every split through ``RawFileSplitRead`` (i.e. a
- single-snapshot table where rows don't overlap).
+ """The read-side guard at ``TableRead.__init__`` must fire even
+ when the scan would dispatch every split through
+ ``RawFileSplitRead`` (single-snapshot, non-overlapping rows).
Before the guard moved to ``TableRead.__init__`` this case
silently bypassed validation because raw-convertible splits skip
- ``MergeFileSplitRead`` entirely — and an option like
- ``partial-update.remove-record-on-delete`` would be ignored on
- the read path while the user assumed it was honoured.
+ ``MergeFileSplitRead`` entirely -- the read path's
+ ``_build_merge_function`` never ran, so an option like
+ ``partial-update.remove-record-on-delete`` was ignored on read.
+
+ The shared dispatch now also fires on the write path's first
+ flush (see ``_assert_partial_update_unsupported``), so we skip
+ ``_write`` here: the read-side guard runs at ``new_read()``
+ construction time regardless of whether data exists.
"""
table = self._create_pk_table(
'pu_rrod_raw_convertible',
extra_options={'partial-update.remove-record-on-delete': 'true'},
)
- # Single write -> single snapshot -> splits are raw-convertible.
- self._write(table, [
- {'id': 1, 'a': 'A', 'b': None, 'c': None},
- {'id': 2, 'a': 'B', 'b': None, 'c': None},
- ])
rb = table.new_read_builder()
with self.assertRaises(NotImplementedError) as cm:
rb.new_read()
diff --git a/paimon-python/pypaimon/tests/test_partial_update_merge_function.py
b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py
index 60dfc7198d..b412fdd388 100644
--- a/paimon-python/pypaimon/tests/test_partial_update_merge_function.py
+++ b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py
@@ -127,8 +127,8 @@ class PartialUpdateMergeFunctionTest(unittest.TestCase):
self.assertIsNone(mf.get_result())
def test_update_after_is_treated_as_insert(self):
- # Java's PartialUpdate accepts UPDATE_AFTER alongside INSERT in
- # non-sequence-group mode (both are "add" kinds). Mirror that.
+ # UPDATE_AFTER is treated as an "add" alongside INSERT in
+ # non-sequence-group mode, matching the upstream contract.
mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
mf.reset()
mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None)))
@@ -172,28 +172,54 @@ class PartialUpdateMergeFunctionTest(unittest.TestCase):
self.assertEqual(_result_key(result), (1,))
self.assertEqual(_result_value(result), ('a', 'x'))
- # -- NOT-NULL input validation (mirrors Java's updateNonNullFields) ----
+ # -- NOT-NULL input validation ----
def test_first_insert_with_null_for_not_null_field_raises(self):
- """If the very first row writes null to a NOT NULL field, raise —
- same input-validation Java does in updateNonNullFields()."""
+ """If the very first row writes null to a NOT NULL field, raise --
+ the schema's NOT NULL declaration is enforced on every add()."""
mf = PartialUpdateMergeFunction(
key_arity=1, value_arity=2, nullables=[True, False])
mf.reset()
with self.assertRaises(ValueError) as cm:
mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None)))
- self.assertIn("Field 1", str(cm.exception))
+ msg = str(cm.exception)
+ # Without field names we fall back to the index, but the
+ # actionable hint must still be there.
+ self.assertIn("at index 1", msg)
+ self.assertIn("Declare the field nullable", msg)
def test_subsequent_insert_with_null_for_not_null_field_raises(self):
- """A later null on a NOT NULL field must also raise — Java checks
- on every add(), not just the first one."""
+ """A later null on a NOT NULL field must also raise -- the
+ NOT NULL check fires on every add(), not just the first one."""
mf = PartialUpdateMergeFunction(
key_arity=1, value_arity=2, nullables=[True, False])
mf.reset()
mf.add(_kv((1,), 1, RowKind.INSERT, ('a', 'x')))
with self.assertRaises(ValueError) as cm:
mf.add(_kv((1,), 2, RowKind.INSERT, (None, None)))
- self.assertIn("Field 1", str(cm.exception))
+ self.assertIn("at index 1", str(cm.exception))
+
+ def test_not_null_error_message_uses_field_name_when_given(self):
+ """When ``value_field_names`` is supplied, the NOT-NULL error
+ names the offending field so the message is directly actionable
+ instead of citing a bare positional index."""
+ mf = PartialUpdateMergeFunction(
+ key_arity=1, value_arity=2,
+ nullables=[True, False],
+ value_field_names=['a', 'b'])
+ mf.reset()
+ with self.assertRaises(ValueError) as cm:
+ mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None)))
+ msg = str(cm.exception)
+ self.assertIn("'b'", msg)
+ self.assertIn("Declare the field nullable", msg)
+
+ def test_value_field_names_length_mismatch_raises(self):
+ with self.assertRaises(ValueError):
+ PartialUpdateMergeFunction(
+ key_arity=1, value_arity=2,
+ nullables=[True, True],
+ value_field_names=['only_one'])
def test_null_for_nullable_field_is_absorbed(self):
"""A null input on a nullable field is silently absorbed (existing
diff --git a/paimon-python/pypaimon/tests/test_sequence_field_read.py
b/paimon-python/pypaimon/tests/test_sequence_field_read.py
index 87d4cbe42c..ed32768c2e 100644
--- a/paimon-python/pypaimon/tests/test_sequence_field_read.py
+++ b/paimon-python/pypaimon/tests/test_sequence_field_read.py
@@ -363,15 +363,16 @@ class SequenceFieldReadE2ETest(unittest.TestCase):
def test_sequence_group_still_rejected(self):
"""Top-level sequence.field is supported, but per-field
- sequence-group is not -- the read must still raise.
+ sequence-group is not -- it must still be rejected. The shared
+ merge-engine dispatch now rejects this combination fail-fast on
+ the write path, so the write (not the read) is what raises.
"""
table = self._create_pk_table(
'seq_group', merge_engine='partial-update',
extra_options={'sequence.field': 'ts',
'fields.ts2.sequence-group': 'val'})
- self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
with self.assertRaises(NotImplementedError):
- self._read(table)
+ self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
def test_nested_sequence_field_rejected(self):
"""nested-sequence-field is unimplemented and must be rejected
diff --git a/paimon-python/pypaimon/tests/test_write_merge_buffer.py
b/paimon-python/pypaimon/tests/test_write_merge_buffer.py
new file mode 100644
index 0000000000..0b86b59138
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_write_merge_buffer.py
@@ -0,0 +1,366 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Unit tests for ``KeyValueDataWriter`` buffer behaviour.
+
+Covers the fold algorithm (`_merge_pending_by_pk`), the flush lifecycle
+(`_flush_all` empties the buffer + clears pending_data), and the
+roll-write helper (`_roll_write` splits oversized buffers across
+multiple files). Drives a thin harness that bypasses
+``DataWriter.__init__`` so tests can exercise these paths without
+spinning up the real catalog/write stack.
+"""
+
+import unittest
+from unittest.mock import Mock
+
+import pyarrow as pa
+
+from pypaimon.read.reader.deduplicate_merge_function import \
+ DeduplicateMergeFunction
+from pypaimon.read.reader.partial_update_merge_function import \
+ PartialUpdateMergeFunction
+from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter
+
+
+# Layout matches what ``KeyValueDataWriter._add_system_fields`` emits:
+# ``[_KEY_id, _SEQUENCE_NUMBER, _VALUE_KIND, id, a, b]``. ``id`` is
+# duplicated on the value side because the value layout in Paimon's
+# row tuple includes every original column.
+_SCHEMA = pa.schema([
+ pa.field('_KEY_id', pa.int64(), nullable=False),
+ pa.field('_SEQUENCE_NUMBER', pa.int64(), nullable=False),
+ pa.field('_VALUE_KIND', pa.int8(), nullable=False),
+ pa.field('id', pa.int64(), nullable=False),
+ pa.field('a', pa.string()),
+ pa.field('b', pa.string()),
+])
+
+
+def _row(pk, seq, a, b):
+ return {
+ '_KEY_id': pk,
+ '_SEQUENCE_NUMBER': seq,
+ '_VALUE_KIND': 0,
+ 'id': pk,
+ 'a': a,
+ 'b': b,
+ }
+
+
+class _Harness(KeyValueDataWriter):
+ """Bypass ``DataWriter.__init__`` to keep tests focused.
+
+ Provides just the attributes ``_merge_pending_by_pk`` / ``_flush_all``
+ / ``_roll_write`` read, plus a recording stub for
+ ``_write_data_to_file`` so the roll-write path can be exercised
+ without touching the filesystem.
+ """
+
+ def __init__(self, merge_function, target_file_size: int = 10 ** 12):
+ self.trimmed_primary_keys = ['id']
+ self._merge_function = merge_function
+ # Large enough that ``_check_and_roll_if_needed`` does not
+ # trigger on its own in tests that don't care about rolling.
+ self.target_file_size = target_file_size
+ self.pending_data = None
+ self.committed_files = []
+ self.written_chunks = []
+
+ def _write_data_to_file(self, data):
+ # Record each chunk instead of writing to disk; mirrors the
+ # base writer's contract of appending to ``committed_files``.
+ self.written_chunks.append(data)
+
+
+class WriteMergeBufferTest(unittest.TestCase):
+
+ # -- deduplicate ------------------------------------------------------
+
+ def test_dedupe_collapses_same_pk_run_to_latest(self):
+ writer = _Harness(DeduplicateMergeFunction())
+ data = pa.Table.from_pylist(
+ [_row(1, 1, 'old', None), _row(1, 2, 'new', None)],
+ schema=_SCHEMA,
+ )
+ out = writer._merge_pending_by_pk(data)
+ self.assertEqual(out.num_rows, 1)
+ self.assertEqual(
+ out.to_pylist(),
+ [_row(1, 2, 'new', None)],
+ )
+
+ def test_dedupe_keeps_disjoint_keys(self):
+ writer = _Harness(DeduplicateMergeFunction())
+ data = pa.Table.from_pylist(
+ [_row(1, 1, 'A', None),
+ _row(2, 2, 'B', None),
+ _row(3, 3, 'C', None)],
+ schema=_SCHEMA,
+ )
+ out = writer._merge_pending_by_pk(data)
+ self.assertEqual(out.num_rows, 3)
+ self.assertEqual(
+ sorted(out.to_pylist(), key=lambda r: r['id']),
+ [_row(1, 1, 'A', None),
+ _row(2, 2, 'B', None),
+ _row(3, 3, 'C', None)],
+ )
+
+ # -- partial-update ---------------------------------------------------
+
+ def _partial_update(self):
+ # Value-side carries 3 columns (id, a, b). The PK column ``id``
+ # is duplicated into the value side so partial-update can apply
+ # last-non-null semantics uniformly across every original
+ # user column.
+ return PartialUpdateMergeFunction(
+ key_arity=1, value_arity=3, nullables=[True, True, True])
+
+ def test_partial_update_merges_non_null_per_field(self):
+ writer = _Harness(self._partial_update())
+ data = pa.Table.from_pylist(
+ [_row(1, 1, 'A', None), _row(1, 2, None, 'B')],
+ schema=_SCHEMA,
+ )
+ out = writer._merge_pending_by_pk(data)
+ self.assertEqual(out.num_rows, 1)
+ self.assertEqual(out.to_pylist(), [_row(1, 2, 'A', 'B')])
+
+ def test_partial_update_three_writes_compose(self):
+ writer = _Harness(self._partial_update())
+ data = pa.Table.from_pylist(
+ [_row(1, 1, 'A', None),
+ _row(1, 2, None, 'B'),
+ _row(1, 3, None, None)],
+ schema=_SCHEMA,
+ )
+ out = writer._merge_pending_by_pk(data)
+ self.assertEqual(out.to_pylist(), [_row(1, 3, 'A', 'B')])
+
+ def test_partial_update_later_null_does_not_clobber_earlier_value(self):
+ writer = _Harness(self._partial_update())
+ data = pa.Table.from_pylist(
+ [_row(1, 1, 'KEEP', 'B'), _row(1, 2, None, None)],
+ schema=_SCHEMA,
+ )
+ out = writer._merge_pending_by_pk(data)
+ self.assertEqual(out.to_pylist(), [_row(1, 2, 'KEEP', 'B')])
+
+ # -- edge cases -------------------------------------------------------
+
+ def test_empty_buffer_returns_empty(self):
+ writer = _Harness(DeduplicateMergeFunction())
+ empty = pa.Table.from_pylist([], schema=_SCHEMA)
+ out = writer._merge_pending_by_pk(empty)
+ self.assertEqual(out.num_rows, 0)
+
+ def test_single_row_buffer_skips_merge(self):
+ # Mock to confirm the merge function isn't invoked: a single
+ # row cannot have duplicates, so we sidestep the to_pylist
+ # round-trip.
+ mock_mf = Mock()
+ writer = _Harness(mock_mf)
+ data = pa.Table.from_pylist(
+ [_row(1, 1, 'X', None)], schema=_SCHEMA)
+ out = writer._merge_pending_by_pk(data)
+ self.assertEqual(out.num_rows, 1)
+ mock_mf.reset.assert_not_called()
+ mock_mf.add.assert_not_called()
+ mock_mf.get_result.assert_not_called()
+
+ def test_get_result_none_drops_pk_run(self):
+ # Future-proof: contract says ``get_result`` returning ``None``
+ # means the entire PK group should be dropped.
+ class DropAll:
+ def reset(self):
+ pass
+
+ def add(self, _):
+ pass
+
+ def get_result(self):
+ return None
+
+ writer = _Harness(DropAll())
+ data = pa.Table.from_pylist(
+ [_row(1, 1, 'A', None), _row(1, 2, 'B', None)],
+ schema=_SCHEMA,
+ )
+ out = writer._merge_pending_by_pk(data)
+ self.assertEqual(out.num_rows, 0)
+
+ # -- KeyValue pooling -------------------------------------------------
+
+ def test_keyvalue_pool_does_not_alias_results_across_runs(self):
+ # Pooling reuses one KeyValue across the whole fold. If the
+ # PartialUpdateMergeFunction's get_result snapshotting were
+ # broken, run 1's result would mutate when run 2's data is
+ # written into the pooled instance. This test would catch that
+ # regression: build a buffer with two distinct PK runs and
+ # verify both results stand on their own.
+ writer = _Harness(self._partial_update())
+ data = pa.Table.from_pylist(
+ [_row(1, 1, 'A1', None),
+ _row(1, 2, None, 'B1'),
+ _row(2, 3, 'A2', None),
+ _row(2, 4, None, 'B2')],
+ schema=_SCHEMA,
+ )
+ out = writer._merge_pending_by_pk(data)
+ self.assertEqual(
+ sorted(out.to_pylist(), key=lambda r: r['id']),
+ [_row(1, 2, 'A1', 'B1'), _row(2, 4, 'A2', 'B2')],
+ )
+
+ # -- _process_data (no longer sorts) ----------------------------------
+
+ def test_process_data_adds_system_fields_without_sorting(self):
+ # Deferred-sort design: ``_process_data`` must not pre-sort the
+ # incoming batch. The global sort happens once inside
+ # ``_flush_all`` over the concatenated buffer.
+ writer = _Harness(DeduplicateMergeFunction())
+ writer.sequence_generator = _StubSeqGen()
+ # Intentionally pass rows in descending PK order; if _process_data
+ # were still sorting, the output would come back ascending.
+ batch = pa.RecordBatch.from_pylist(
+ [{'id': 3, 'a': 'C', 'b': None},
+ {'id': 1, 'a': 'A', 'b': None},
+ {'id': 2, 'a': 'B', 'b': None}],
+ schema=pa.schema([
+ pa.field('id', pa.int64(), nullable=False),
+ pa.field('a', pa.string()),
+ pa.field('b', pa.string()),
+ ]),
+ )
+ out = writer._process_data(batch)
+ self.assertEqual(
+ [r['id'] for r in out.to_pylist()],
+ [3, 1, 2],
+ )
+
+ # -- _flush_all -------------------------------------------------------
+
+ def test_flush_all_sorts_folds_and_writes_one_file(self):
+ # Buffer with duplicate PKs in arbitrary order. _flush_all is
+ # responsible for sorting before folding, so unsorted input is
+ # the right stress case.
+ writer = _Harness(DeduplicateMergeFunction())
+ writer.pending_data = pa.Table.from_pylist(
+ [_row(2, 5, 'B2-new', None),
+ _row(1, 2, 'A1-mid', None),
+ _row(1, 1, 'A1-old', None),
+ _row(2, 4, 'B2-old', None),
+ _row(1, 3, 'A1-new', None)],
+ schema=_SCHEMA,
+ )
+ writer._flush_all()
+
+ # Buffer cleared.
+ self.assertIsNone(writer.pending_data)
+ # Exactly one file written (size well under target).
+ self.assertEqual(len(writer.written_chunks), 1)
+ flushed = writer.written_chunks[0]
+ result = sorted(flushed.to_pylist(), key=lambda r: r['id'])
+ # Dedup -> 1 row per PK, with the highest seq value retained.
+ self.assertEqual(result, [
+ _row(1, 3, 'A1-new', None),
+ _row(2, 5, 'B2-new', None),
+ ])
+
+ def test_flush_all_on_empty_buffer_is_noop(self):
+ writer = _Harness(DeduplicateMergeFunction())
+ writer.pending_data = None
+ writer._flush_all()
+ self.assertIsNone(writer.pending_data)
+ self.assertEqual(writer.written_chunks, [])
+
+ def test_flush_all_clears_buffer_even_when_fold_drops_everything(self):
+ # MergeFunction that returns None for every group; verifies
+ # ``_flush_all`` still resets ``pending_data`` so a subsequent
+ # write starts from a clean slate.
+ class DropAll:
+ def reset(self):
+ pass
+
+ def add(self, _):
+ pass
+
+ def get_result(self):
+ return None
+
+ writer = _Harness(DropAll())
+ writer.pending_data = pa.Table.from_pylist(
+ [_row(1, 1, 'A', None), _row(1, 2, 'B', None)],
+ schema=_SCHEMA,
+ )
+ writer._flush_all()
+ self.assertIsNone(writer.pending_data)
+ self.assertEqual(writer.written_chunks, [])
+
+ # -- _roll_write ------------------------------------------------------
+
+ def test_roll_write_single_chunk_when_under_target(self):
+ writer = _Harness(DeduplicateMergeFunction(),
+ target_file_size=10 ** 9)
+ data = pa.Table.from_pylist(
+ [_row(1, 1, 'A', None), _row(2, 2, 'B', None)],
+ schema=_SCHEMA,
+ )
+ writer._roll_write(data)
+ self.assertEqual(len(writer.written_chunks), 1)
+ self.assertEqual(writer.written_chunks[0].num_rows, 2)
+
+ def test_roll_write_splits_oversized_buffer_into_multiple_files(self):
+ # Build a buffer whose nbytes comfortably exceeds the chosen
+ # target. With a small target_file_size the writer should hand
+ # back at least two files. Use long strings so nbytes scales
+ # predictably with row count.
+ rows = [
+ _row(i, i, 'x' * 64, 'y' * 64) for i in range(1, 401)
+ ]
+ data = pa.Table.from_pylist(rows, schema=_SCHEMA)
+ # Target small enough that 400 rows will not fit in one file.
+ target = data.nbytes // 4
+ writer = _Harness(DeduplicateMergeFunction(),
+ target_file_size=target)
+ writer._roll_write(data)
+
+ self.assertGreaterEqual(len(writer.written_chunks), 2)
+ total_rows = sum(c.num_rows for c in writer.written_chunks)
+ self.assertEqual(total_rows, data.num_rows)
+ # Each chunk except possibly the last should respect the target.
+ for chunk in writer.written_chunks[:-1]:
+ self.assertLessEqual(chunk.nbytes, target)
+
+
+class _StubSeqGen:
+ """Stand-in for ``SequenceGenerator`` so the harness can call
+ ``_process_data`` without going through the real ``DataWriter.__init__``.
+ """
+
+ def __init__(self):
+ self._n = 0
+
+ def next(self) -> int:
+ self._n += 1
+ return self._n
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/write/file_store_write.py
b/paimon-python/pypaimon/write/file_store_write.py
index f1374797e1..c77f88e907 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -15,11 +15,15 @@
# specific language governing permissions and limitations
# under the License.
+import logging
import random
from typing import Dict, List, Tuple
import pyarrow as pa
+
+logger = logging.getLogger(__name__)
+
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
@@ -90,7 +94,8 @@ class FileStoreWrite:
partition=partition,
bucket=bucket,
max_seq_number=max_seq_number(),
- options=options)
+ options=options,
+ merge_function=self._build_pk_merge_function())
else:
seq_number = 0 if self.table.bucket_mode() ==
BucketMode.BUCKET_UNAWARE else max_seq_number()
return AppendOnlyDataWriter(
@@ -102,6 +107,111 @@ class FileStoreWrite:
write_cols=self.write_cols
)
+ def _build_pk_merge_function(self):
+ """Build the merge function for the in-memory write buffer.
+
+ Shares ``merge_engine_dispatch.build_merge_function`` with the
+ read path so the supported engines (deduplicate, first-row,
+ partial-update with no out-of-scope options) cannot drift
+ between sides.
+
+ For wholly unsupported engines (``aggregation``) the writer
+ falls back to ``DeduplicateMergeFunction`` so the flushed file
+ still maintains the LSM "PK unique within a file" invariant.
+ The read path's dispatch still raises ``NotImplementedError``,
+ so the user gets an explicit error before they observe
+ wrong-engine data; the fallback only narrows the damage to
+ "file is deduped, not aggregated" rather than the silent
+ multi-row-per-PK corruption that existed pre-PR.
+
+ Partial-update with out-of-scope options (sequence-group,
+ per-field aggregator, ignore-delete, remove-record-on-*) does
+ **not** fall back: ``partial_update_unsupported_options`` sees
+ the configured keys and re-raises, so the first
+ ``write_arrow`` call (where ``_create_data_writer`` first runs)
+ surfaces the error. Silently degrading to dedupe there is the
+ same live corruption pattern this PR exists to close.
+
+ ``with_write_type`` (column-subset writes) on a PK table is
+ also rejected here. The buffer layout
+ ``_add_system_fields`` produces would carry only the subset
+ on the value side, while a ``MergeFunction`` such as
+ ``PartialUpdateMergeFunction`` is built against the full table
+ arity -- the two sides would mismatch on
+ ``KeyValue.value.get_field`` and raise ``IndexError`` at
+ flush time. Refusing it explicitly avoids that obscure failure
+ and keeps the supported surface narrow.
+
+ The value-side schema must match the layout
+ ``KeyValueDataWriter`` flushes -- ``_add_system_fields`` keeps
+ every original user column on the value side (the primary keys
+ are duplicated as ``_KEY_<pk>`` columns to the left of the
+ value side). So ``value_arity`` here is ``len(table.fields)``,
+ not ``len(table.fields) - len(primary_keys)``.
+ """
+ from pypaimon.common.merge_engine_dispatch import (
+ build_merge_function, partial_update_unsupported_options)
+ from pypaimon.common.options.core_options import MergeEngine
+ from pypaimon.read.reader.deduplicate_merge_function import \
+ DeduplicateMergeFunction
+
+ engine = self.options.merge_engine()
+ raw_options = self.options.options.to_map()
+
+ if self.write_cols is not None:
+ raise NotImplementedError(
+ "with_write_type is not yet supported on primary-key "
+ "tables: the writer-side merge buffer assumes the "
+ "input batch carries the full table schema. Drop the "
+ "with_write_type call or write the missing columns as "
+ "nulls in the input batch."
+ )
+
+ # PARTIAL_UPDATE + out-of-scope option: never silently fall
+ # back -- forward the read-side error verbatim so writes fail
+ # before the first flush rather than corrupt the file.
+ if engine == MergeEngine.PARTIAL_UPDATE \
+ and partial_update_unsupported_options(raw_options):
+ return build_merge_function(
+ engine=engine, raw_options=raw_options,
+ key_arity=len(self.table.trimmed_primary_keys),
+ value_arity=len(self.table.table_schema.fields),
+ value_field_nullables=[
+ f.type.nullable for f in self.table.table_schema.fields],
+ value_field_names=[
+ f.name for f in self.table.table_schema.fields],
+ )
+
+ # Catch the dispatch's "wholly unsupported engine" raise only
+ # for the engines we know are out of scope today; any other
+ # NotImplementedError is a bug we want to surface, not swallow.
+ if engine == MergeEngine.AGGREGATE:
+ # Surface the silent semantic mismatch in logs: the file
+ # will be PK-unique (better than the pre-PR multi-row
+ # corruption), but any reader that honours the declared
+ # engine will see wrong values. Users sharing tables
+ # across writers especially need to see this.
+ logger.warning(
+ "merge-engine '%s' is not implemented on the pypaimon "
+ "write path; falling back to deduplicate so the flushed "
+ "file stays PK-unique. The file contents reflect "
+ "deduplicate semantics (latest writer wins), not %s "
+ "semantics. Any reader that interprets the file under "
+ "the declared engine will return incorrect results. "
+ "Avoid the pypaimon writer for tables on this engine.",
+ engine.value, engine.value)
+ return DeduplicateMergeFunction()
+
+ all_value_fields = self.table.table_schema.fields
+ return build_merge_function(
+ engine=engine, raw_options=raw_options,
+ key_arity=len(self.table.trimmed_primary_keys),
+ value_arity=len(all_value_fields),
+ value_field_nullables=[
+ f.type.nullable for f in all_value_fields],
+ value_field_names=[f.name for f in all_value_fields],
+ )
+
def _has_blob_columns(self) -> bool:
"""Check if the table schema contains blob columns."""
for field in self.table.table_schema.fields:
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
index 6c6f292f57..64f6003a06 100644
--- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -15,22 +15,227 @@
# specific language governing permissions and limitations
# under the License.
+from typing import List, Union
+
import pyarrow as pa
import pyarrow.compute as pc
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.read.reader.deduplicate_merge_function import \
+ DeduplicateMergeFunction
+from pypaimon.table.row.key_value import KeyValue
from pypaimon.write.writer.data_writer import DataWriter
class KeyValueDataWriter(DataWriter):
- """Data writer for primary key tables with system fields and sorting."""
+ """Data writer for primary key tables with system fields and sorting.
+
+ Accumulates incoming batches in ``pending_data`` without sorting or
+ folding on the write path. Sort and ``MergeFunction``-based fold
+ are deferred to flush time (``_flush_all``), where the result is
+ roll-written into one or more data files. This enforces the LSM
+ "PK unique within a file" invariant the read-side
+ ``raw_convertible`` fast path relies on, while keeping per-write
+ cost bounded.
+ """
+
+ def __init__(self, table, partition, bucket, max_seq_number,
+ options=None, write_cols=None, merge_function=None):
+ super().__init__(table, partition, bucket, max_seq_number,
+ options, write_cols)
+ # Defaults to deduplicate so direct callers (tests / future code
+ # paths that don't go through FileStoreWrite) don't accidentally
+ # skip the merge step entirely.
+ self._merge_function = merge_function or DeduplicateMergeFunction()
def _process_data(self, data: pa.RecordBatch) -> pa.Table:
+ # No sort here: sorting once at flush is strictly cheaper than
+ # per-batch sort + a final global sort. ``pending_data`` ends
+ # up as a concat of unsorted batches; ``_flush_all`` sorts it
+ # exactly once before folding.
enhanced_data = self._add_system_fields(data)
- return
pa.Table.from_batches([self._sort_by_primary_key(enhanced_data)])
+ return pa.Table.from_batches([enhanced_data])
def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) ->
pa.Table:
- combined = pa.concat_tables([existing_data, new_data])
- return self._sort_by_primary_key(combined)
+ # Plain concat. Sort + fold both run inside ``_flush_all`` so
+ # N writes incur 1 sort instead of N sorts.
+ return pa.concat_tables([existing_data, new_data])
+
+ def prepare_commit(self) -> List[DataFileMeta]:
+ if self.pending_data is not None and self.pending_data.num_rows > 0:
+ self._flush_all()
+ # ``_flush_all`` leaves ``pending_data = None``, so super's
+ # prepare_commit just returns ``committed_files``.
+ return super().prepare_commit()
+
+ def _check_and_roll_if_needed(self):
+ # Buffer overflowed target_file_size: sort + fold + roll-write
+ # the whole buffer as multiple files in one pass. Unlike the
+ # base class's slice loop, we never keep a slice remainder in
+ # ``pending_data`` -- flush empties the buffer outright.
+ if (self.pending_data is not None
+ and self.pending_data.num_rows > 0
+ and self.pending_data.nbytes > self.target_file_size):
+ self._flush_all()
+
+ def close(self):
+ # Override the base ``close`` because its straight
+ # ``_write_data_to_file(pending_data)`` would land an unsorted,
+ # un-folded buffer on disk -- violating the file-internal
+ # PK-unique invariant. Route the final flush through
+ # ``_flush_all`` so the contract holds even on the
+ # close-without-prepare_commit path.
+ try:
+ if self.pending_data is not None and self.pending_data.num_rows >
0:
+ self._flush_all()
+ except Exception as e:
+ import logging
+ logger = logging.getLogger(__name__)
+ logger.warning(
+ "Exception occurs when closing writer. Cleaning up.",
+ exc_info=e)
+ self.abort()
+ raise e
+ finally:
+ self.pending_data = None
+
+ def _flush_all(self) -> None:
+ """Sort + fold the entire buffer, then roll-write as files.
+
+ On return, ``pending_data is None`` and every flushed chunk
+ has been recorded in ``committed_files``. The buffer is
+ always fully drained per flush: no slice remainder is
+ carried back into ``pending_data``.
+ """
+ if self.pending_data is None or self.pending_data.num_rows == 0:
+ self.pending_data = None
+ return
+ sorted_data = self._sort_by_primary_key(self.pending_data)
+ folded = self._merge_pending_by_pk(sorted_data)
+ self.pending_data = None
+ if folded.num_rows == 0:
+ return
+ self._roll_write(folded)
+
+ def _roll_write(self, data: pa.Table) -> None:
+ """Write ``data`` as one or more files, each <= target_file_size.
+
+ ``data`` is required to be PK-unique (the fold guarantees
+ that), so any slice of it is also PK-unique -- splitting for
+ size does not violate the LSM file-internal invariant.
+ Reuses ``_find_optimal_split_point`` / ``_write_data_to_file``
+ from the base class.
+ """
+ while data.num_rows > 0:
+ if data.nbytes <= self.target_file_size:
+ self._write_data_to_file(data)
+ return
+ split_row = self._find_optimal_split_point(
+ data, self.target_file_size)
+ if split_row <= 0:
+ # Single row already exceeds target_file_size; nothing
+ # to gain from further slicing, write it as-is.
+ self._write_data_to_file(data)
+ return
+ self._write_data_to_file(data.slice(0, split_row))
+ data = data.slice(split_row)
+
+ def _merge_pending_by_pk(self, data: pa.Table) -> pa.Table:
+ """Fold same-PK runs in ``data`` using ``self._merge_function``.
+
+ ``data`` is required to already be sorted by
+ ``(primary_key, _SEQUENCE_NUMBER)``. ``_flush_all`` is the
+ only caller and runs ``_sort_by_primary_key`` immediately
+ before this method, so the precondition holds.
+
+ NOTE(follow-up): the merge runs row-by-row over
+ ``data.to_pydict()`` / ``pa.Table.from_pydict``. Arrow types
+ with non-trivial Python representations (Decimal128 with
+ specific precision/scale, timestamps with timezone or
+ sub-millisecond units, durations, deeply nested structs) can
+ drift across this round-trip. A columnar merge implementation
+ would close the gap and is tracked as a follow-up; until
+ then, partial-update on those types should be avoided in
+ pypaimon.
+ """
+ n = data.num_rows
+ if n < 2:
+ # Single-row buffer cannot have duplicates; sidestep the
+ # row-by-row pyarrow round-trip in the common streaming case.
+ return data
+
+ col_names = data.schema.names
+ # ``to_pydict`` works on pyarrow >= 6 (Python 3.6 CI ships 6.0.1),
+ # unlike ``to_pylist`` which only landed in pyarrow 7.
+ col_dict = data.to_pydict()
+ rows = [{name: col_dict[name][i] for name in col_names}
+ for i in range(n)]
+ key_arity = len(self.trimmed_primary_keys)
+ # System fields sit at indices [key_arity, key_arity + 1] (the
+ # _SEQUENCE_NUMBER and _VALUE_KIND columns added by
+ # _add_system_fields). Everything to the right is the value side.
+ value_arity = len(col_names) - key_arity - 2
+
+ # Pool one ``KeyValue`` for the whole fold. Safe because:
+ # - ``DeduplicateMergeFunction.add`` stores the kv reference; the
+ # reused instance always carries the most recent ``replace``,
+ # which is exactly the "latest wins" the engine wants.
+ # - ``PartialUpdateMergeFunction.add`` also stores a reference,
+ # but ``get_result`` snapshots key + sequence into a fresh
+ # tuple before returning, so the consumed result is decoupled
+ # from any later ``replace`` on the pooled kv.
+ # - ``FirstRowMergeFunction.add`` ``copy()``s the first kv, so it
+ # keeps the first row rather than tracking later ``replace``s on
+ # the pooled kv (which would otherwise yield the last row).
+ # This drops per-row ``KeyValue``/``OffsetRow`` allocations and
+ # the resulting GC churn on large buffers.
+ pooled_kv = KeyValue(key_arity, value_arity)
+
+ merged_rows: List[dict] = []
+ i = 0
+ while i < n:
+ j = i
+ first_key = self._key_tuple(rows[i], col_names, key_arity)
+ while j < n and \
+ self._key_tuple(rows[j], col_names, key_arity) ==
first_key:
+ j += 1
+ run = rows[i:j]
+ self._merge_function.reset()
+ for r in run:
+ pooled_kv.replace(self._row_to_tuple(r, col_names))
+ self._merge_function.add(pooled_kv)
+ result_kv = self._merge_function.get_result()
+ if result_kv is not None:
+ merged_rows.append(
+ self._kv_to_row(result_kv, col_names,
+ key_arity, value_arity))
+ i = j
+
+ if not merged_rows:
+ return data.slice(0, 0)
+ result_dict = {name: [r[name] for r in merged_rows]
+ for name in data.schema.names}
+ return pa.Table.from_pydict(result_dict, schema=data.schema)
+
+ @staticmethod
+ def _key_tuple(row: dict, col_names: List[str], key_arity: int) -> tuple:
+ return tuple(row[col_names[i]] for i in range(key_arity))
+
+ @staticmethod
+ def _row_to_tuple(row: dict, col_names: List[str]) -> tuple:
+ return tuple(row[name] for name in col_names)
+
+ @staticmethod
+ def _kv_to_row(kv: KeyValue, col_names: List[str],
+ key_arity: int, value_arity: int) -> dict:
+ out = {}
+ for i in range(key_arity):
+ out[col_names[i]] = kv.key.get_field(i)
+ out[col_names[key_arity]] = kv.sequence_number
+ out[col_names[key_arity + 1]] = kv.value_row_kind_byte
+ for i in range(value_arity):
+ out[col_names[key_arity + 2 + i]] = kv.value.get_field(i)
+ return out
def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch:
"""Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND."""
@@ -61,11 +266,15 @@ class KeyValueDataWriter(DataWriter):
return pa.RecordBatch.from_arrays(new_arrays,
schema=pa.schema(new_fields))
- def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch:
+ def _sort_by_primary_key(
+ self, data: Union[pa.RecordBatch, pa.Table]
+ ) -> Union[pa.RecordBatch, pa.Table]:
+ # pc.sort_indices + .take work uniformly over RecordBatch and
+ # Table, so this serves both the per-batch entry path (legacy)
+ # and the buffer-wide sort path (used by ``_flush_all``).
sort_keys = [(key, 'ascending') for key in self.trimmed_primary_keys]
if '_SEQUENCE_NUMBER' in data.schema.names:
sort_keys.append(('_SEQUENCE_NUMBER', 'ascending'))
sorted_indices = pc.sort_indices(data, sort_keys=sort_keys)
- sorted_batch = data.take(sorted_indices)
- return sorted_batch
+ return data.take(sorted_indices)