This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f58ede9a6c [python] Implement partial-update merge engine in pypaimon
(#7745)
f58ede9a6c is described below
commit f58ede9a6c00dca569db30323e9511b14c0a30d3
Author: chaoyang <[email protected]>
AuthorDate: Sat May 23 22:58:01 2026 +0800
[python] Implement partial-update merge engine in pypaimon (#7745)
---
.../pypaimon/read/merge_engine_support.py | 114 +++++++
.../read/reader/partial_update_merge_function.py | 134 +++++++++
.../pypaimon/read/reader/sort_merge_reader.py | 10 +-
paimon-python/pypaimon/read/split_read.py | 40 ++-
paimon-python/pypaimon/read/table_read.py | 8 +
.../pypaimon/tests/test_partial_update_e2e.py | 333 +++++++++++++++++++++
.../tests/test_partial_update_merge_function.py | 226 ++++++++++++++
7 files changed, 860 insertions(+), 5 deletions(-)
diff --git a/paimon-python/pypaimon/read/merge_engine_support.py
b/paimon-python/pypaimon/read/merge_engine_support.py
new file mode 100644
index 0000000000..d54cd8b0e4
--- /dev/null
+++ b/paimon-python/pypaimon/read/merge_engine_support.py
@@ -0,0 +1,114 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Table-level validation for the configured ``merge-engine`` option.
+
+Lives outside any specific ``SplitRead`` because the table's
+merge-engine configuration is a property of the whole read, not of one
+split. ``TableRead`` may dispatch the same logical scan across multiple
+``SplitRead`` implementations (e.g. ``MergeFileSplitRead`` for splits
+that need k-way merge, ``RawFileSplitRead`` for raw-convertible splits
+where keys don't overlap). The unsupported-engine and
+unsupported-options checks need to fire regardless of which dispatch
+branch is picked — otherwise a single fresh snapshot whose splits are
+all raw-convertible would silently bypass the guard and produce wrong
+results when the table configures, e.g.,
+``partial-update.remove-record-on-delete=true``.
+"""
+
+from typing import Set
+
+from pypaimon.common.options.core_options import MergeEngine
+
+# Boolean-valued options that, when truthy, opt the table into behaviour
+# the Python ``PartialUpdateMergeFunction`` does not implement.
+_PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS = (
+ "ignore-delete",
+ "partial-update.ignore-delete",
+ "first-row.ignore-delete",
+ "deduplicate.ignore-delete",
+ "partial-update.remove-record-on-delete",
+ "partial-update.remove-record-on-sequence-group",
+)
+_FIELDS_PREFIX = "fields."
+_FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group"
+_FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function"
+_DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function"
+
+
+def check_supported(table) -> None:
+ """Raise ``NotImplementedError`` if the table's merge-engine
+ configuration is outside what pypaimon's read path implements.
+
+ Non-PK tables are always fine (no merge function involved).
+ """
+ if not table.is_primary_key_table:
+ return
+ engine = table.options.merge_engine()
+ if engine == MergeEngine.DEDUPLICATE:
+ return
+ if engine == MergeEngine.PARTIAL_UPDATE:
+ unsupported = partial_update_unsupported_options(table)
+ if unsupported:
+ raise NotImplementedError(
+ "merge-engine 'partial-update' is enabled together with "
+ "options that pypaimon does not yet implement: {}. The "
+ "supported subset is per-key last-non-null merge with "
+ "no sequence-group, no per-field aggregator override, "
+ "no ignore-delete and no partial-update.remove-record-"
+ "on-* flags. Use the Java client for the full feature "
+ "set, or open an issue to track Python support.".format(
+ ", ".join(sorted(unsupported))
+ )
+ )
+ return
+ raise NotImplementedError(
+ "merge-engine '{}' is not implemented in pypaimon yet "
+ "(supported: deduplicate, partial-update). Use the Java "
+ "client or open an issue to track support.".format(engine.value)
+ )
+
+
+def partial_update_unsupported_options(table) -> Set[str]:
+ """Return the set of option keys configured on this table that
+ ``PartialUpdateMergeFunction`` does not yet support. Empty set
+ means the simple last-non-null merge is safe to run.
+ """
+ flagged: Set[str] = set()
+ raw = table.options.options.to_map()
+ for key, value in raw.items():
+ if (key in _PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS
+ and _option_is_truthy(value)):
+ flagged.add(key)
+ elif key == _DEFAULT_AGGREGATE_FUNCTION_KEY:
+ flagged.add(key)
+ elif key.startswith(_FIELDS_PREFIX) and (
+ key.endswith(_FIELD_SEQUENCE_GROUP_SUFFIX)
+ or key.endswith(_FIELD_AGGREGATE_FUNCTION_SUFFIX)):
+ flagged.add(key)
+ return flagged
+
+
+def _option_is_truthy(raw) -> bool:
+ if raw is None:
+ return False
+ if isinstance(raw, bool):
+ return raw
+ if isinstance(raw, str):
+ return raw.strip().lower() in ("true", "1", "yes", "on")
+ return bool(raw)
diff --git
a/paimon-python/pypaimon/read/reader/partial_update_merge_function.py
b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py
new file mode 100644
index 0000000000..978b48011c
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py
@@ -0,0 +1,134 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+Python port of Java's ``PartialUpdateMergeFunction``
+(``paimon-core/src/main/java/org/apache/paimon/mergetree/compact/
+PartialUpdateMergeFunction.java``).
+
+The merge function used by the ``partial-update`` merge engine on PK
+tables: rows sharing a primary key are merged left-to-right, taking the
+latest non-null value per non-PK field. ``DeduplicateMergeFunction``
+keeps only the latest row; ``PartialUpdateMergeFunction`` instead lets
+later writes "fill in" fields the earlier writes left null, so users
+can write the same logical record across multiple commits with
+different sets of non-null columns.
+
+This is the **core merge semantics only**. The Java implementation also
+supports per-field aggregator overrides (``fields.<name>.aggregate-
+function``), sequence groups (``fields.<name>.sequence-group``),
+``ignore-delete``, and ``partial-update.remove-record-on-*`` options.
+None of those are implemented yet; non-INSERT row kinds raise
+``NotImplementedError`` at ``add`` time so we never silently corrupt
+data with a half-implemented contract.
+"""
+
+from typing import Any, List, Optional
+
+from pypaimon.table.row.key_value import KeyValue
+from pypaimon.table.row.row_kind import RowKind
+
+
+class PartialUpdateMergeFunction:
+ """A MergeFunction where the key is the primary key (unique) and the
+ value is merged across all rows for that key by taking the latest
+ non-null value per non-PK field.
+
+ Mirrors the ``MergeFunction`` protocol used by ``SortMergeReader``:
+ ``reset`` (between groups of same-key rows), ``add`` (one row at a
+ time, oldest to newest), ``get_result`` (after the group is
+ exhausted).
+ """
+
+ def __init__(self, key_arity: int, value_arity: int,
+ nullables: Optional[List[bool]] = None):
+ self._key_arity = key_arity
+ self._value_arity = value_arity
+ # Per-value-field nullable flags, parallel to value indices. When
+ # ``None``, no nullability check runs (preserves the contract for
+ # direct callers that don't have schema info handy). When given,
+ # mirrors Java's ``updateNonNullFields`` check: a null input on a
+ # NOT NULL field raises rather than being silently absorbed.
+ if nullables is not None and len(nullables) != value_arity:
+ raise ValueError(
+ "nullables length {} does not match value_arity {}".format(
+ len(nullables), value_arity))
+ self._nullables = nullables
+ # Lazily allocated on first add(); ``None`` means "no rows yet".
+ self._accumulator: Optional[List[Any]] = None
+ # Reference to the most recently added kv. We use it only to
+ # propagate the key + sequence_number into the result row, and we
+ # snapshot those two values into a fresh tuple in ``get_result()``
+ # so the result is not aliased to upstream's reused KeyValue.
+ self._latest_kv: Optional[KeyValue] = None
+
+ def reset(self) -> None:
+ self._accumulator = None
+ self._latest_kv = None
+
+ def add(self, kv: KeyValue) -> None:
+ row_kind_byte = kv.value_row_kind_byte
+ if not RowKind.is_add_byte(row_kind_byte):
+ # DELETE / UPDATE_BEFORE need ignore-delete or
+ # partial-update.remove-record-on-delete to be set in Java;
+ # neither option is wired up in pypaimon yet, so refuse the
+ # row rather than silently swallow it.
+ raise NotImplementedError(
+ "PartialUpdateMergeFunction received a {} row; this "
+ "Python port does not yet implement the ignore-delete / "
+ "partial-update.remove-record-on-delete options. Use the "
+ "Java client for tables that produce DELETE / "
+ "UPDATE_BEFORE
rows.".format(RowKind(row_kind_byte).to_string())
+ )
+
+ # Mirror Java's reset() + updateNonNullFields(): the accumulator
+ # starts as all-null (equivalent to ``new GenericRow(arity)``) and
+ # each add() writes non-null inputs; null inputs are absorbed —
+ # except when the schema marks the field NOT NULL, in which case
+ # we raise to match Java's IllegalArgumentException check.
+ if self._accumulator is None:
+ self._accumulator = [None] * self._value_arity
+ for i in range(self._value_arity):
+ v = kv.value.get_field(i)
+ if v is not None:
+ self._accumulator[i] = v
+ elif self._nullables is not None and not self._nullables[i]:
+ raise ValueError("Field {} can not be null".format(i))
+ self._latest_kv = kv
+
+ def get_result(self) -> Optional[KeyValue]:
+ if self._accumulator is None or self._latest_kv is None:
+ return None
+
+ kv = self._latest_kv
+ # Snapshot the key as a fresh tuple — we cannot keep a reference
+ # to ``kv`` because upstream readers (e.g. KeyValueWrapReader)
+ # reuse a single KeyValue instance and mutate its underlying
+ # row_tuple between calls. Building a fresh tuple here means the
+ # result we return is decoupled from any subsequent iteration.
+ key_values = tuple(
+ kv.key.get_field(i) for i in range(self._key_arity)
+ )
+ result_row = key_values + (
+ kv.sequence_number,
+ RowKind.INSERT.value,
+ ) + tuple(self._accumulator)
+
+ result = KeyValue(self._key_arity, self._value_arity)
+ result.replace(result_row)
+ return result
diff --git a/paimon-python/pypaimon/read/reader/sort_merge_reader.py
b/paimon-python/pypaimon/read/reader/sort_merge_reader.py
index 764b722830..56f42b6f3c 100644
--- a/paimon-python/pypaimon/read/reader/sort_merge_reader.py
+++ b/paimon-python/pypaimon/read/reader/sort_merge_reader.py
@@ -29,9 +29,15 @@ from pypaimon.table.row.key_value import KeyValue
class SortMergeReaderWithMinHeap(RecordReader):
"""SortMergeReader implemented with min-heap."""
- def __init__(self, readers: List[RecordReader[KeyValue]], schema:
TableSchema):
+ def __init__(self, readers: List[RecordReader[KeyValue]], schema:
TableSchema,
+ merge_function: Optional[Any] = None):
self.next_batch_readers = list(readers)
- self.merge_function = DeduplicateMergeFunction()
+ # Default to dedupe so callers that don't pass a merge_function
+ # keep their old behaviour. The merge engine dispatch lives in
+ # ``MergeFileSplitRead.section_reader_supplier`` for the read
+ # path; tests or other ad-hoc callers can pass a different
+ # implementation here.
+ self.merge_function = merge_function if merge_function is not None
else DeduplicateMergeFunction()
if schema.partition_keys:
trimmed_primary_keys = [pk for pk in schema.primary_keys if pk not
in schema.partition_keys]
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 8a203c9f4c..2f233af743 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -20,7 +20,7 @@ from abc import ABC, abstractmethod
from functools import partial
from typing import Callable, Dict, List, Optional, Tuple
-from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.common.options.core_options import CoreOptions, MergeEngine
from pypaimon.common.predicate import Predicate
from pypaimon.deletionvectors import ApplyDeletionVectorReader
from pypaimon.deletionvectors.deletion_vector import DeletionVector
@@ -53,7 +53,10 @@ from pypaimon.read.reader.key_value_unwrap_reader import \
KeyValueUnwrapRecordReader
from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader
from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
-from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
+from pypaimon.read.reader.partial_update_merge_function import \
+ PartialUpdateMergeFunction
+from pypaimon.read.reader.sort_merge_reader import (DeduplicateMergeFunction,
+ SortMergeReaderWithMinHeap)
from pypaimon.read.push_down_utils import _get_all_fields
from pypaimon.read.split import Split
from pypaimon.read.sliced_split import SlicedSplit
@@ -99,6 +102,10 @@ class SplitRead(ABC):
self.row_tracking_enabled = row_tracking_enabled
self.value_arity = len(read_type)
self.nested_name_paths = nested_name_paths
+ # Snapshot the raw value-side schema before _create_key_value_fields
+ # wraps it, so MergeFileSplitRead can hand per-value-field nullable
+ # flags to merge functions that mirror Java's NOT-NULL check.
+ self.value_fields = list(read_type)
self.trimmed_primary_key = self.table.trimmed_primary_keys
self.read_fields = read_type
@@ -611,7 +618,34 @@ class MergeFileSplitRead(SplitRead):
supplier = partial(self.kv_reader_supplier, file,
self.deletion_file_readers.get(file.file_name, None))
data_readers.append(supplier)
readers.append(ConcatRecordReader(data_readers))
- return SortMergeReaderWithMinHeap(readers, self.table.table_schema)
+ merge_function = self._build_merge_function()
+ return SortMergeReaderWithMinHeap(
+ readers, self.table.table_schema, merge_function=merge_function)
+
+ def _build_merge_function(self):
+ """Pick the right MergeFunction implementation for the table's
+ ``merge-engine`` option.
+
+ The pre-flight checks that reject unsupported engines or option
+ combinations live in
+ :func:`pypaimon.read.merge_engine_support.check_supported` and
+ run at ``TableRead.__init__`` time, so by the point this method
+ executes only the supported engines are reachable.
+ """
+ engine = self.table.options.merge_engine()
+ if engine == MergeEngine.DEDUPLICATE:
+ return DeduplicateMergeFunction()
+ if engine == MergeEngine.PARTIAL_UPDATE:
+ return PartialUpdateMergeFunction(
+ key_arity=len(self.trimmed_primary_key),
+ value_arity=self.value_arity,
+ nullables=[f.type.nullable for f in self.value_fields],
+ )
+ # check_supported() rejects everything else at TableRead.__init__.
+ raise AssertionError(
+ "unreachable: merge-engine '{}' should have been rejected by "
+ "merge_engine_support.check_supported".format(engine.value)
+ )
def create_reader(self) -> RecordReader:
# Create a dict mapping data file name to deletion file reader method
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index b3a8edaf63..52a4eaaa7f 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -81,8 +81,16 @@ class TableRead:
nested_name_paths: Optional[List[List[str]]] = None,
limit: Optional[int] = None,
):
+ from pypaimon.read.merge_engine_support import check_supported
from pypaimon.table.file_store_table import FileStoreTable
+ # Validate merge-engine support before any split-level dispatch.
+ # Raw-convertible splits skip MergeFileSplitRead, so this guard
+ # has to live at the read-builder level — otherwise unsupported
+ # options (e.g. partial-update.remove-record-on-delete) get
+ # silently ignored on fresh single-snapshot tables.
+ check_supported(table)
+
self.table: FileStoreTable = table
self.predicate = predicate
self.read_type = read_type
diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py
b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
new file mode 100644
index 0000000000..af5f61e98f
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
@@ -0,0 +1,333 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""End-to-end tests for the ``partial-update`` merge engine.
+
+Each test creates a PK table with ``merge-engine`` set to a particular
+value, writes one or more batches, and reads back. Partial-update reads
+must merge non-null fields across batches; ``deduplicate`` must keep
+the latest row only; ``aggregation`` and ``first-row`` must raise
+``NotImplementedError`` (until they are ported), since silently
+treating them as deduplicate would corrupt the user's data.
+"""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', True)
+
+ cls.pa_schema = pa.schema([
+ pa.field('id', pa.int64(), nullable=False),
+ ('a', pa.string()),
+ ('b', pa.string()),
+ ('c', pa.string()),
+ ])
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def _create_pk_table(self, table_name, merge_engine='partial-update',
+ extra_options=None):
+ # bucket=1 so all rows for any PK land in the same bucket; this is
+ # what forces the read path through SortMergeReader instead of the
+ # raw_convertible / single-file fast path. partial-update merging
+ # only happens inside SortMergeReader.
+ options = {
+ 'bucket': '1',
+ 'merge-engine': merge_engine,
+ }
+ if extra_options:
+ options.update(extra_options)
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['id'],
+ options=options,
+ )
+ full = 'default.{}'.format(table_name)
+ self.catalog.create_table(full, schema, False)
+ return self.catalog.get_table(full)
+
+ def _write(self, table, rows):
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ c = wb.new_commit()
+ try:
+ w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema))
+ c.commit(w.prepare_commit())
+ finally:
+ w.close()
+ c.close()
+
+ def _read(self, table):
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ if not splits:
+ return []
+ return sorted(
+ rb.new_read().to_arrow(splits).to_pylist(),
+ key=lambda r: r['id'],
+ )
+
+ # -- partial-update happy path ---------------------------------------
+
+ def test_partial_update_two_writes_merges_non_null(self):
+ """Two writes against the same PK with disjoint non-null columns
+ must merge into a single row that has both columns populated.
+ """
+ table = self._create_pk_table('two_writes')
+ self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}])
+ self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}],
+ )
+
+ def test_partial_update_three_writes_merges_left_to_right(self):
+ """Three overlapping writes — each filling in a different column —
+ compose into the union of non-null fields.
+ """
+ table = self._create_pk_table('three_writes')
+ self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}])
+ self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}])
+ self._write(table, [{'id': 1, 'a': None, 'b': None, 'c': 'C'}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}],
+ )
+
+ def test_partial_update_disjoint_keys_unaffected(self):
+ """Three rows with disjoint PKs must all appear unchanged in the
+ output — partial-update only merges rows that share a PK.
+ """
+ table = self._create_pk_table('disjoint_keys')
+ self._write(table, [
+ {'id': 1, 'a': 'A1', 'b': None, 'c': None},
+ {'id': 2, 'a': None, 'b': 'B2', 'c': None},
+ {'id': 3, 'a': None, 'b': None, 'c': 'C3'},
+ ])
+
+ self.assertEqual(
+ self._read(table),
+ [
+ {'id': 1, 'a': 'A1', 'b': None, 'c': None},
+ {'id': 2, 'a': None, 'b': 'B2', 'c': None},
+ {'id': 3, 'a': None, 'b': None, 'c': 'C3'},
+ ],
+ )
+
+ def test_partial_update_later_value_wins_over_earlier_non_null(self):
+ """When two writes both supply a non-null value for the same
+ column, the later value wins (latest non-null per field).
+ """
+ table = self._create_pk_table('later_wins')
+ self._write(table, [{'id': 1, 'a': 'old', 'b': 'keep', 'c': None}])
+ self._write(table, [{'id': 1, 'a': 'new', 'b': None, 'c': 'fill'}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'a': 'new', 'b': 'keep', 'c': 'fill'}],
+ )
+
+ def test_partial_update_later_null_does_not_clobber_earlier_value(self):
+ """A later write with NULL for a column does NOT overwrite an
+ earlier non-null value for that column.
+ """
+ table = self._create_pk_table('null_no_clobber')
+ self._write(table, [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}])
+ self._write(table, [{'id': 1, 'a': None, 'b': None, 'c': None}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}],
+ )
+
+ # -- deduplicate (regression) ----------------------------------------
+
+ def test_deduplicate_engine_unchanged(self):
+ """The default ``deduplicate`` engine must keep the latest row
+ intact, including its NULLs — exactly the pre-PR behaviour.
+ """
+ table = self._create_pk_table('dedupe', merge_engine='deduplicate')
+ self._write(table, [{'id': 1, 'a': 'old', 'b': 'old-b', 'c': 'old-c'}])
+ self._write(table, [{'id': 1, 'a': 'new', 'b': None, 'c': None}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'a': 'new', 'b': None, 'c': None}],
+ )
+
+ # -- engines we don't support yet ------------------------------------
+
+ def test_aggregation_engine_raises_not_implemented(self):
+ """Until ``aggregation`` is ported, reading an aggregation table
+ must raise rather than silently produce dedupe results."""
+ table = self._create_pk_table('agg_unsupported',
+ merge_engine='aggregation')
+ self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}])
+ self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}])
+
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ with self.assertRaises(NotImplementedError) as cm:
+ rb.new_read().to_arrow(splits)
+ self.assertIn('aggregation', str(cm.exception))
+
+ def test_first_row_engine_raises_not_implemented(self):
+ """Until ``first-row`` is ported, reading a first-row table must
+ raise rather than silently produce dedupe results."""
+ table = self._create_pk_table('first_row_unsupported',
+ merge_engine='first-row')
+ self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}])
+ self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}])
+
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ with self.assertRaises(NotImplementedError) as cm:
+ rb.new_read().to_arrow(splits)
+ self.assertIn('first-row', str(cm.exception))
+
+ # -- partial-update + out-of-scope option combinations ---------------
+ #
+ # When a user pairs ``merge-engine: partial-update`` with any option
+ # this port doesn't implement (sequence-group, per-field aggregator
+ # override, ignore-delete, partial-update.remove-record-on-*), we
+ # must raise rather than silently run the simple last-non-null merge
+ # — otherwise we'd reproduce the same silent-corruption pattern this
+ # PR exists to close.
+
+ def _assert_partial_update_unsupported(self, table_name, extra_options,
+ expected_keys):
+ table = self._create_pk_table(
+ table_name, extra_options=extra_options)
+ self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}])
+ self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}])
+
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ with self.assertRaises(NotImplementedError) as cm:
+ rb.new_read().to_arrow(splits)
+ msg = str(cm.exception)
+ self.assertIn("partial-update", msg)
+ for key in expected_keys:
+ self.assertIn(key, msg,
+ "expected option key '{}' in error: {}".format(key,
msg))
+
+ def test_partial_update_with_sequence_group_raises(self):
+ self._assert_partial_update_unsupported(
+ 'pu_seq_group',
+ {'fields.b.sequence-group': 'a'},
+ ['fields.b.sequence-group'],
+ )
+
+ def test_partial_update_with_field_aggregate_function_raises(self):
+ self._assert_partial_update_unsupported(
+ 'pu_field_agg',
+ {'fields.a.aggregate-function': 'last_non_null_value'},
+ ['fields.a.aggregate-function'],
+ )
+
+ def test_partial_update_with_default_aggregate_function_raises(self):
+ self._assert_partial_update_unsupported(
+ 'pu_default_agg',
+ {'fields.default-aggregate-function': 'last_non_null_value'},
+ ['fields.default-aggregate-function'],
+ )
+
+ def test_partial_update_with_ignore_delete_raises(self):
+ self._assert_partial_update_unsupported(
+ 'pu_ignore_delete',
+ {'ignore-delete': 'true'},
+ ['ignore-delete'],
+ )
+
+ def test_partial_update_with_remove_record_on_delete_raises(self):
+ self._assert_partial_update_unsupported(
+ 'pu_rrod',
+ {'partial-update.remove-record-on-delete': 'true'},
+ ['partial-update.remove-record-on-delete'],
+ )
+
+ def test_partial_update_with_remove_record_on_sequence_group_raises(self):
+ self._assert_partial_update_unsupported(
+ 'pu_rrosg',
+ {'partial-update.remove-record-on-sequence-group': 'true'},
+ ['partial-update.remove-record-on-sequence-group'],
+ )
+
+ def
test_partial_update_unsupported_options_guard_covers_raw_convertible(self):
+ """The unsupported-options guard must fire even when the scan
+ would dispatch every split through ``RawFileSplitRead`` (i.e. a
+ single-snapshot table where rows don't overlap).
+
+ Before the guard moved to ``TableRead.__init__`` this case
+ silently bypassed validation because raw-convertible splits skip
+ ``MergeFileSplitRead`` entirely — and an option like
+ ``partial-update.remove-record-on-delete`` would be ignored on
+ the read path while the user assumed it was honoured.
+ """
+ table = self._create_pk_table(
+ 'pu_rrod_raw_convertible',
+ extra_options={'partial-update.remove-record-on-delete': 'true'},
+ )
+ # Single write -> single snapshot -> splits are raw-convertible.
+ self._write(table, [
+ {'id': 1, 'a': 'A', 'b': None, 'c': None},
+ {'id': 2, 'a': 'B', 'b': None, 'c': None},
+ ])
+ rb = table.new_read_builder()
+ with self.assertRaises(NotImplementedError) as cm:
+ rb.new_read()
+ msg = str(cm.exception)
+ self.assertIn('partial-update', msg)
+ self.assertIn('partial-update.remove-record-on-delete', msg)
+
+ def
test_partial_update_with_explicit_ignore_delete_false_does_not_raise(self):
+ """Explicitly setting ignore-delete=false is equivalent to leaving
+ it unset and must not trip the guard."""
+ table = self._create_pk_table(
+ 'pu_ignore_delete_false',
+ extra_options={'ignore-delete': 'false'},
+ )
+ self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}])
+ self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}],
+ )
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_partial_update_merge_function.py
b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py
new file mode 100644
index 0000000000..60dfc7198d
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py
@@ -0,0 +1,226 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Direct unit tests for ``PartialUpdateMergeFunction``.
+
+Drives the merge function with synthetic ``KeyValue`` instances so the
+contract is pinned down without going through the full read pipeline.
+The end-to-end behaviour on real PK tables is exercised separately in
+``test_partial_update_e2e.py``.
+"""
+
+import unittest
+
+from pypaimon.read.reader.partial_update_merge_function import \
+ PartialUpdateMergeFunction
+from pypaimon.table.row.key_value import KeyValue
+from pypaimon.table.row.row_kind import RowKind
+
+
+def _kv(key, seq, row_kind, value):
+ """Build a fresh KeyValue for a (key, sequence, row_kind, value) tuple.
+
+ ``key`` and ``value`` are tuples of primitives — the helper handles
+ layout (key, seq, row_kind_byte, value) so individual tests can stay
+ focused on the merge semantics.
+ """
+ kv = KeyValue(key_arity=len(key), value_arity=len(value))
+ kv.replace(tuple(key) + (seq, row_kind.value) + tuple(value))
+ return kv
+
+
+def _result_value(kv):
+ """Extract the value tuple out of a KeyValue produced by get_result()."""
+ return tuple(kv.value.get_field(i) for i in range(kv.value_arity))
+
+
+def _result_key(kv):
+ return tuple(kv.key.get_field(i) for i in range(kv.key_arity))
+
+
+class PartialUpdateMergeFunctionTest(unittest.TestCase):
+
+ def test_single_insert_returns_value(self):
+ mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+ mf.reset()
+ mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x')))
+ result = mf.get_result()
+
+ self.assertIsNotNone(result)
+ self.assertEqual(_result_key(result), (1,))
+ self.assertEqual(_result_value(result), ('a', 'x'))
+ self.assertEqual(result.sequence_number, 100)
+ self.assertEqual(result.value_row_kind_byte, RowKind.INSERT.value)
+
+ def test_second_insert_overwrites_non_null(self):
+ mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+ mf.reset()
+ mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None)))
+ mf.add(_kv((1,), 101, RowKind.INSERT, ('b', None)))
+
+ result = mf.get_result()
+ self.assertEqual(_result_value(result), ('b', None))
+ # Sequence number tracks the latest add().
+ self.assertEqual(result.sequence_number, 101)
+
+ def test_second_insert_fills_in_null(self):
+ mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+ mf.reset()
+ mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None)))
+ mf.add(_kv((1,), 101, RowKind.INSERT, (None, 'x')))
+
+ result = mf.get_result()
+ self.assertEqual(_result_value(result), ('a', 'x'))
+
+ def test_third_insert_continues_merge(self):
+ mf = PartialUpdateMergeFunction(key_arity=1, value_arity=3)
+ mf.reset()
+ mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None, None)))
+ mf.add(_kv((1,), 101, RowKind.INSERT, (None, 'b', None)))
+ mf.add(_kv((1,), 102, RowKind.INSERT, (None, None, 'c')))
+
+ result = mf.get_result()
+ self.assertEqual(_result_value(result), ('a', 'b', 'c'))
+
+ def test_later_null_does_not_clobber_earlier_value(self):
+ mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+ mf.reset()
+ mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x')))
+ mf.add(_kv((1,), 101, RowKind.INSERT, (None, None)))
+
+ result = mf.get_result()
+ self.assertEqual(_result_value(result), ('a', 'x'))
+
+ def test_reset_between_keys(self):
+ mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+
+ mf.reset()
+ mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x')))
+ first = mf.get_result()
+ self.assertEqual(_result_key(first), (1,))
+ self.assertEqual(_result_value(first), ('a', 'x'))
+
+ mf.reset()
+ mf.add(_kv((2,), 200, RowKind.INSERT, ('b', 'y')))
+ second = mf.get_result()
+ self.assertEqual(_result_key(second), (2,))
+ self.assertEqual(_result_value(second), ('b', 'y'))
+
+ def test_get_result_before_any_add_returns_none(self):
+ mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+ mf.reset()
+ self.assertIsNone(mf.get_result())
+
+ def test_update_after_is_treated_as_insert(self):
+ # Java's PartialUpdate accepts UPDATE_AFTER alongside INSERT in
+ # non-sequence-group mode (both are "add" kinds). Mirror that.
+ mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+ mf.reset()
+ mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None)))
+ mf.add(_kv((1,), 101, RowKind.UPDATE_AFTER, (None, 'x')))
+
+ result = mf.get_result()
+ self.assertEqual(_result_value(result), ('a', 'x'))
+
+ def test_delete_row_raises_not_implemented(self):
+ mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+ mf.reset()
+ mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x')))
+ with self.assertRaises(NotImplementedError) as cm:
+ mf.add(_kv((1,), 101, RowKind.DELETE, (None, None)))
+ self.assertIn('DELETE', str(cm.exception))
+ self.assertIn('ignore-delete', str(cm.exception))
+
+ def test_update_before_row_raises_not_implemented(self):
+ mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+ mf.reset()
+ with self.assertRaises(NotImplementedError) as cm:
+ mf.add(_kv((1,), 100, RowKind.UPDATE_BEFORE, (None, None)))
+ self.assertIn('UPDATE_BEFORE', str(cm.exception))
+
+ def test_result_is_decoupled_from_input_kv(self):
+ """The merge function must build a fresh result tuple — upstream
+ readers reuse a single KeyValue instance and call ``replace`` on
+ each iteration, so holding a reference to the input is unsafe.
+ """
+ mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+ mf.reset()
+ kv = _kv((1,), 100, RowKind.INSERT, ('a', 'x'))
+ mf.add(kv)
+ result = mf.get_result()
+
+ # Mutate the input's underlying tuple to simulate a reused
+ # KeyValue being rebound to a different row.
+ kv.replace((999, 999, RowKind.INSERT.value, 'evil', 'evil'))
+
+ # The previously-returned result must NOT be affected.
+ self.assertEqual(_result_key(result), (1,))
+ self.assertEqual(_result_value(result), ('a', 'x'))
+
+ # -- NOT-NULL input validation (mirrors Java's updateNonNullFields) ----
+
+ def test_first_insert_with_null_for_not_null_field_raises(self):
+ """If the very first row writes null to a NOT NULL field, raise —
+ same input-validation Java does in updateNonNullFields()."""
+ mf = PartialUpdateMergeFunction(
+ key_arity=1, value_arity=2, nullables=[True, False])
+ mf.reset()
+ with self.assertRaises(ValueError) as cm:
+ mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None)))
+ self.assertIn("Field 1", str(cm.exception))
+
+ def test_subsequent_insert_with_null_for_not_null_field_raises(self):
+ """A later null on a NOT NULL field must also raise — Java checks
+ on every add(), not just the first one."""
+ mf = PartialUpdateMergeFunction(
+ key_arity=1, value_arity=2, nullables=[True, False])
+ mf.reset()
+ mf.add(_kv((1,), 1, RowKind.INSERT, ('a', 'x')))
+ with self.assertRaises(ValueError) as cm:
+ mf.add(_kv((1,), 2, RowKind.INSERT, (None, None)))
+ self.assertIn("Field 1", str(cm.exception))
+
+ def test_null_for_nullable_field_is_absorbed(self):
+ """A null input on a nullable field is silently absorbed (existing
+ accumulator value wins) — the standard partial-update semantic."""
+ mf = PartialUpdateMergeFunction(
+ key_arity=1, value_arity=2, nullables=[True, True])
+ mf.reset()
+ mf.add(_kv((1,), 1, RowKind.INSERT, ('a', 'x')))
+ mf.add(_kv((1,), 2, RowKind.INSERT, (None, 'y')))
+ result = mf.get_result()
+ self.assertEqual(_result_value(result), ('a', 'y'))
+
+ def test_nullables_length_mismatch_raises(self):
+ with self.assertRaises(ValueError):
+ PartialUpdateMergeFunction(
+ key_arity=1, value_arity=2, nullables=[True])
+
+ def test_no_nullables_arg_skips_check(self):
+ """Backward-compat: callers that don't pass ``nullables`` get the
+ previous behaviour (no NOT-NULL validation)."""
+ mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+ mf.reset()
+ # Would have raised had we declared the second field NOT NULL.
+ mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None)))
+ result = mf.get_result()
+ self.assertEqual(_result_value(result), ('a', None))
+
+
+if __name__ == '__main__':
+ unittest.main()