This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f58ede9a6c [python] Implement partial-update merge engine in pypaimon 
(#7745)
f58ede9a6c is described below

commit f58ede9a6c00dca569db30323e9511b14c0a30d3
Author: chaoyang <[email protected]>
AuthorDate: Sat May 23 22:58:01 2026 +0800

    [python] Implement partial-update merge engine in pypaimon (#7745)
---
 .../pypaimon/read/merge_engine_support.py          | 114 +++++++
 .../read/reader/partial_update_merge_function.py   | 134 +++++++++
 .../pypaimon/read/reader/sort_merge_reader.py      |  10 +-
 paimon-python/pypaimon/read/split_read.py          |  40 ++-
 paimon-python/pypaimon/read/table_read.py          |   8 +
 .../pypaimon/tests/test_partial_update_e2e.py      | 333 +++++++++++++++++++++
 .../tests/test_partial_update_merge_function.py    | 226 ++++++++++++++
 7 files changed, 860 insertions(+), 5 deletions(-)

diff --git a/paimon-python/pypaimon/read/merge_engine_support.py 
b/paimon-python/pypaimon/read/merge_engine_support.py
new file mode 100644
index 0000000000..d54cd8b0e4
--- /dev/null
+++ b/paimon-python/pypaimon/read/merge_engine_support.py
@@ -0,0 +1,114 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Table-level validation for the configured ``merge-engine`` option.
+
+Lives outside any specific ``SplitRead`` because the table's
+merge-engine configuration is a property of the whole read, not of one
+split. ``TableRead`` may dispatch the same logical scan across multiple
+``SplitRead`` implementations (e.g. ``MergeFileSplitRead`` for splits
+that need k-way merge, ``RawFileSplitRead`` for raw-convertible splits
+where keys don't overlap). The unsupported-engine and
+unsupported-options checks need to fire regardless of which dispatch
+branch is picked — otherwise a single fresh snapshot whose splits are
+all raw-convertible would silently bypass the guard and produce wrong
+results when the table configures, e.g.,
+``partial-update.remove-record-on-delete=true``.
+"""
+
+from typing import Set
+
+from pypaimon.common.options.core_options import MergeEngine
+
+# Boolean-valued options that, when truthy, opt the table into behaviour
+# the Python ``PartialUpdateMergeFunction`` does not implement.
+_PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS = (
+    "ignore-delete",
+    "partial-update.ignore-delete",
+    "first-row.ignore-delete",
+    "deduplicate.ignore-delete",
+    "partial-update.remove-record-on-delete",
+    "partial-update.remove-record-on-sequence-group",
+)
+_FIELDS_PREFIX = "fields."
+_FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group"
+_FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function"
+_DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function"
+
+
+def check_supported(table) -> None:
+    """Raise ``NotImplementedError`` if the table's merge-engine
+    configuration is outside what pypaimon's read path implements.
+
+    Non-PK tables are always fine (no merge function involved).
+    """
+    if not table.is_primary_key_table:
+        return
+    engine = table.options.merge_engine()
+    if engine == MergeEngine.DEDUPLICATE:
+        return
+    if engine == MergeEngine.PARTIAL_UPDATE:
+        unsupported = partial_update_unsupported_options(table)
+        if unsupported:
+            raise NotImplementedError(
+                "merge-engine 'partial-update' is enabled together with "
+                "options that pypaimon does not yet implement: {}. The "
+                "supported subset is per-key last-non-null merge with "
+                "no sequence-group, no per-field aggregator override, "
+                "no ignore-delete and no partial-update.remove-record-"
+                "on-* flags. Use the Java client for the full feature "
+                "set, or open an issue to track Python support.".format(
+                    ", ".join(sorted(unsupported))
+                )
+            )
+        return
+    raise NotImplementedError(
+        "merge-engine '{}' is not implemented in pypaimon yet "
+        "(supported: deduplicate, partial-update). Use the Java "
+        "client or open an issue to track support.".format(engine.value)
+    )
+
+
+def partial_update_unsupported_options(table) -> Set[str]:
+    """Return the set of option keys configured on this table that
+    ``PartialUpdateMergeFunction`` does not yet support. Empty set
+    means the simple last-non-null merge is safe to run.
+    """
+    flagged: Set[str] = set()
+    raw = table.options.options.to_map()
+    for key, value in raw.items():
+        if (key in _PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS
+                and _option_is_truthy(value)):
+            flagged.add(key)
+        elif key == _DEFAULT_AGGREGATE_FUNCTION_KEY:
+            flagged.add(key)
+        elif key.startswith(_FIELDS_PREFIX) and (
+                key.endswith(_FIELD_SEQUENCE_GROUP_SUFFIX)
+                or key.endswith(_FIELD_AGGREGATE_FUNCTION_SUFFIX)):
+            flagged.add(key)
+    return flagged
+
+
+def _option_is_truthy(raw) -> bool:
+    if raw is None:
+        return False
+    if isinstance(raw, bool):
+        return raw
+    if isinstance(raw, str):
+        return raw.strip().lower() in ("true", "1", "yes", "on")
+    return bool(raw)
diff --git 
a/paimon-python/pypaimon/read/reader/partial_update_merge_function.py 
b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py
new file mode 100644
index 0000000000..978b48011c
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py
@@ -0,0 +1,134 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+Python port of Java's ``PartialUpdateMergeFunction``
+(``paimon-core/src/main/java/org/apache/paimon/mergetree/compact/
+PartialUpdateMergeFunction.java``).
+
+The merge function used by the ``partial-update`` merge engine on PK
+tables: rows sharing a primary key are merged left-to-right, taking the
+latest non-null value per non-PK field. ``DeduplicateMergeFunction``
+keeps only the latest row; ``PartialUpdateMergeFunction`` instead lets
+later writes "fill in" fields the earlier writes left null, so users
+can write the same logical record across multiple commits with
+different sets of non-null columns.
+
+This is the **core merge semantics only**. The Java implementation also
+supports per-field aggregator overrides (``fields.<name>.aggregate-
+function``), sequence groups (``fields.<name>.sequence-group``),
+``ignore-delete``, and ``partial-update.remove-record-on-*`` options.
+None of those are implemented yet; non-INSERT row kinds raise
+``NotImplementedError`` at ``add`` time so we never silently corrupt
+data with a half-implemented contract.
+"""
+
+from typing import Any, List, Optional
+
+from pypaimon.table.row.key_value import KeyValue
+from pypaimon.table.row.row_kind import RowKind
+
+
+class PartialUpdateMergeFunction:
+    """A MergeFunction where the key is the primary key (unique) and the
+    value is merged across all rows for that key by taking the latest
+    non-null value per non-PK field.
+
+    Mirrors the ``MergeFunction`` protocol used by ``SortMergeReader``:
+    ``reset`` (between groups of same-key rows), ``add`` (one row at a
+    time, oldest to newest), ``get_result`` (after the group is
+    exhausted).
+    """
+
+    def __init__(self, key_arity: int, value_arity: int,
+                 nullables: Optional[List[bool]] = None):
+        self._key_arity = key_arity
+        self._value_arity = value_arity
+        # Per-value-field nullable flags, parallel to value indices. When
+        # ``None``, no nullability check runs (preserves the contract for
+        # direct callers that don't have schema info handy). When given,
+        # mirrors Java's ``updateNonNullFields`` check: a null input on a
+        # NOT NULL field raises rather than being silently absorbed.
+        if nullables is not None and len(nullables) != value_arity:
+            raise ValueError(
+                "nullables length {} does not match value_arity {}".format(
+                    len(nullables), value_arity))
+        self._nullables = nullables
+        # Lazily allocated on first add(); ``None`` means "no rows yet".
+        self._accumulator: Optional[List[Any]] = None
+        # Reference to the most recently added kv. We use it only to
+        # propagate the key + sequence_number into the result row, and we
+        # snapshot those two values into a fresh tuple in ``get_result()``
+        # so the result is not aliased to upstream's reused KeyValue.
+        self._latest_kv: Optional[KeyValue] = None
+
+    def reset(self) -> None:
+        self._accumulator = None
+        self._latest_kv = None
+
+    def add(self, kv: KeyValue) -> None:
+        row_kind_byte = kv.value_row_kind_byte
+        if not RowKind.is_add_byte(row_kind_byte):
+            # DELETE / UPDATE_BEFORE need ignore-delete or
+            # partial-update.remove-record-on-delete to be set in Java;
+            # neither option is wired up in pypaimon yet, so refuse the
+            # row rather than silently swallow it.
+            raise NotImplementedError(
+                "PartialUpdateMergeFunction received a {} row; this "
+                "Python port does not yet implement the ignore-delete / "
+                "partial-update.remove-record-on-delete options. Use the "
+                "Java client for tables that produce DELETE / "
+                "UPDATE_BEFORE 
rows.".format(RowKind(row_kind_byte).to_string())
+            )
+
+        # Mirror Java's reset() + updateNonNullFields(): the accumulator
+        # starts as all-null (equivalent to ``new GenericRow(arity)``) and
+        # each add() writes non-null inputs; null inputs are absorbed —
+        # except when the schema marks the field NOT NULL, in which case
+        # we raise to match Java's IllegalArgumentException check.
+        if self._accumulator is None:
+            self._accumulator = [None] * self._value_arity
+        for i in range(self._value_arity):
+            v = kv.value.get_field(i)
+            if v is not None:
+                self._accumulator[i] = v
+            elif self._nullables is not None and not self._nullables[i]:
+                raise ValueError("Field {} can not be null".format(i))
+        self._latest_kv = kv
+
+    def get_result(self) -> Optional[KeyValue]:
+        if self._accumulator is None or self._latest_kv is None:
+            return None
+
+        kv = self._latest_kv
+        # Snapshot the key as a fresh tuple — we cannot keep a reference
+        # to ``kv`` because upstream readers (e.g. KeyValueWrapReader)
+        # reuse a single KeyValue instance and mutate its underlying
+        # row_tuple between calls. Building a fresh tuple here means the
+        # result we return is decoupled from any subsequent iteration.
+        key_values = tuple(
+            kv.key.get_field(i) for i in range(self._key_arity)
+        )
+        result_row = key_values + (
+            kv.sequence_number,
+            RowKind.INSERT.value,
+        ) + tuple(self._accumulator)
+
+        result = KeyValue(self._key_arity, self._value_arity)
+        result.replace(result_row)
+        return result
diff --git a/paimon-python/pypaimon/read/reader/sort_merge_reader.py 
b/paimon-python/pypaimon/read/reader/sort_merge_reader.py
index 764b722830..56f42b6f3c 100644
--- a/paimon-python/pypaimon/read/reader/sort_merge_reader.py
+++ b/paimon-python/pypaimon/read/reader/sort_merge_reader.py
@@ -29,9 +29,15 @@ from pypaimon.table.row.key_value import KeyValue
 class SortMergeReaderWithMinHeap(RecordReader):
     """SortMergeReader implemented with min-heap."""
 
-    def __init__(self, readers: List[RecordReader[KeyValue]], schema: 
TableSchema):
+    def __init__(self, readers: List[RecordReader[KeyValue]], schema: 
TableSchema,
+                 merge_function: Optional[Any] = None):
         self.next_batch_readers = list(readers)
-        self.merge_function = DeduplicateMergeFunction()
+        # Default to dedupe so callers that don't pass a merge_function
+        # keep their old behaviour. The merge engine dispatch lives in
+        # ``MergeFileSplitRead.section_reader_supplier`` for the read
+        # path; tests or other ad-hoc callers can pass a different
+        # implementation here.
+        self.merge_function = merge_function if merge_function is not None 
else DeduplicateMergeFunction()
 
         if schema.partition_keys:
             trimmed_primary_keys = [pk for pk in schema.primary_keys if pk not 
in schema.partition_keys]
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 8a203c9f4c..2f233af743 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -20,7 +20,7 @@ from abc import ABC, abstractmethod
 from functools import partial
 from typing import Callable, Dict, List, Optional, Tuple
 
-from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.common.options.core_options import CoreOptions, MergeEngine
 from pypaimon.common.predicate import Predicate
 from pypaimon.deletionvectors import ApplyDeletionVectorReader
 from pypaimon.deletionvectors.deletion_vector import DeletionVector
@@ -53,7 +53,10 @@ from pypaimon.read.reader.key_value_unwrap_reader import \
     KeyValueUnwrapRecordReader
 from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader
 from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
-from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
+from pypaimon.read.reader.partial_update_merge_function import \
+    PartialUpdateMergeFunction
+from pypaimon.read.reader.sort_merge_reader import (DeduplicateMergeFunction,
+                                                    SortMergeReaderWithMinHeap)
 from pypaimon.read.push_down_utils import _get_all_fields
 from pypaimon.read.split import Split
 from pypaimon.read.sliced_split import SlicedSplit
@@ -99,6 +102,10 @@ class SplitRead(ABC):
         self.row_tracking_enabled = row_tracking_enabled
         self.value_arity = len(read_type)
         self.nested_name_paths = nested_name_paths
+        # Snapshot the raw value-side schema before _create_key_value_fields
+        # wraps it, so MergeFileSplitRead can hand per-value-field nullable
+        # flags to merge functions that mirror Java's NOT-NULL check.
+        self.value_fields = list(read_type)
 
         self.trimmed_primary_key = self.table.trimmed_primary_keys
         self.read_fields = read_type
@@ -611,7 +618,34 @@ class MergeFileSplitRead(SplitRead):
                 supplier = partial(self.kv_reader_supplier, file, 
self.deletion_file_readers.get(file.file_name, None))
                 data_readers.append(supplier)
             readers.append(ConcatRecordReader(data_readers))
-        return SortMergeReaderWithMinHeap(readers, self.table.table_schema)
+        merge_function = self._build_merge_function()
+        return SortMergeReaderWithMinHeap(
+            readers, self.table.table_schema, merge_function=merge_function)
+
+    def _build_merge_function(self):
+        """Pick the right MergeFunction implementation for the table's
+        ``merge-engine`` option.
+
+        The pre-flight checks that reject unsupported engines or option
+        combinations live in
+        :func:`pypaimon.read.merge_engine_support.check_supported` and
+        run at ``TableRead.__init__`` time, so by the point this method
+        executes only the supported engines are reachable.
+        """
+        engine = self.table.options.merge_engine()
+        if engine == MergeEngine.DEDUPLICATE:
+            return DeduplicateMergeFunction()
+        if engine == MergeEngine.PARTIAL_UPDATE:
+            return PartialUpdateMergeFunction(
+                key_arity=len(self.trimmed_primary_key),
+                value_arity=self.value_arity,
+                nullables=[f.type.nullable for f in self.value_fields],
+            )
+        # check_supported() rejects everything else at TableRead.__init__.
+        raise AssertionError(
+            "unreachable: merge-engine '{}' should have been rejected by "
+            "merge_engine_support.check_supported".format(engine.value)
+        )
 
     def create_reader(self) -> RecordReader:
         # Create a dict mapping data file name to deletion file reader method
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index b3a8edaf63..52a4eaaa7f 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -81,8 +81,16 @@ class TableRead:
         nested_name_paths: Optional[List[List[str]]] = None,
         limit: Optional[int] = None,
     ):
+        from pypaimon.read.merge_engine_support import check_supported
         from pypaimon.table.file_store_table import FileStoreTable
 
+        # Validate merge-engine support before any split-level dispatch.
+        # Raw-convertible splits skip MergeFileSplitRead, so this guard
+        # has to live at the read-builder level — otherwise unsupported
+        # options (e.g. partial-update.remove-record-on-delete) get
+        # silently ignored on fresh single-snapshot tables.
+        check_supported(table)
+
         self.table: FileStoreTable = table
         self.predicate = predicate
         self.read_type = read_type
diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py 
b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
new file mode 100644
index 0000000000..af5f61e98f
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
@@ -0,0 +1,333 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""End-to-end tests for the ``partial-update`` merge engine.
+
+Each test creates a PK table with ``merge-engine`` set to a particular
+value, writes one or more batches, and reads back. Partial-update reads
+must merge non-null fields across batches; ``deduplicate`` must keep
+the latest row only; ``aggregation`` and ``first-row`` must raise
+``NotImplementedError`` (until they are ported), since silently
+treating them as deduplicate would corrupt the user's data.
+"""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+
+        cls.pa_schema = pa.schema([
+            pa.field('id', pa.int64(), nullable=False),
+            ('a', pa.string()),
+            ('b', pa.string()),
+            ('c', pa.string()),
+        ])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def _create_pk_table(self, table_name, merge_engine='partial-update',
+                         extra_options=None):
+        # bucket=1 so all rows for any PK land in the same bucket; this is
+        # what forces the read path through SortMergeReader instead of the
+        # raw_convertible / single-file fast path. partial-update merging
+        # only happens inside SortMergeReader.
+        options = {
+            'bucket': '1',
+            'merge-engine': merge_engine,
+        }
+        if extra_options:
+            options.update(extra_options)
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['id'],
+            options=options,
+        )
+        full = 'default.{}'.format(table_name)
+        self.catalog.create_table(full, schema, False)
+        return self.catalog.get_table(full)
+
+    def _write(self, table, rows):
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        c = wb.new_commit()
+        try:
+            w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema))
+            c.commit(w.prepare_commit())
+        finally:
+            w.close()
+            c.close()
+
+    def _read(self, table):
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        if not splits:
+            return []
+        return sorted(
+            rb.new_read().to_arrow(splits).to_pylist(),
+            key=lambda r: r['id'],
+        )
+
+    # -- partial-update happy path ---------------------------------------
+
+    def test_partial_update_two_writes_merges_non_null(self):
+        """Two writes against the same PK with disjoint non-null columns
+        must merge into a single row that has both columns populated.
+        """
+        table = self._create_pk_table('two_writes')
+        self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}])
+        self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}],
+        )
+
+    def test_partial_update_three_writes_merges_left_to_right(self):
+        """Three overlapping writes — each filling in a different column —
+        compose into the union of non-null fields.
+        """
+        table = self._create_pk_table('three_writes')
+        self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}])
+        self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}])
+        self._write(table, [{'id': 1, 'a': None, 'b': None, 'c': 'C'}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}],
+        )
+
+    def test_partial_update_disjoint_keys_unaffected(self):
+        """Three rows with disjoint PKs must all appear unchanged in the
+        output — partial-update only merges rows that share a PK.
+        """
+        table = self._create_pk_table('disjoint_keys')
+        self._write(table, [
+            {'id': 1, 'a': 'A1', 'b': None, 'c': None},
+            {'id': 2, 'a': None, 'b': 'B2', 'c': None},
+            {'id': 3, 'a': None, 'b': None, 'c': 'C3'},
+        ])
+
+        self.assertEqual(
+            self._read(table),
+            [
+                {'id': 1, 'a': 'A1', 'b': None, 'c': None},
+                {'id': 2, 'a': None, 'b': 'B2', 'c': None},
+                {'id': 3, 'a': None, 'b': None, 'c': 'C3'},
+            ],
+        )
+
+    def test_partial_update_later_value_wins_over_earlier_non_null(self):
+        """When two writes both supply a non-null value for the same
+        column, the later value wins (latest non-null per field).
+        """
+        table = self._create_pk_table('later_wins')
+        self._write(table, [{'id': 1, 'a': 'old', 'b': 'keep', 'c': None}])
+        self._write(table, [{'id': 1, 'a': 'new', 'b': None, 'c': 'fill'}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'a': 'new', 'b': 'keep', 'c': 'fill'}],
+        )
+
+    def test_partial_update_later_null_does_not_clobber_earlier_value(self):
+        """A later write with NULL for a column does NOT overwrite an
+        earlier non-null value for that column.
+        """
+        table = self._create_pk_table('null_no_clobber')
+        self._write(table, [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}])
+        self._write(table, [{'id': 1, 'a': None, 'b': None, 'c': None}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}],
+        )
+
+    # -- deduplicate (regression) ----------------------------------------
+
+    def test_deduplicate_engine_unchanged(self):
+        """The default ``deduplicate`` engine must keep the latest row
+        intact, including its NULLs — exactly the pre-PR behaviour.
+        """
+        table = self._create_pk_table('dedupe', merge_engine='deduplicate')
+        self._write(table, [{'id': 1, 'a': 'old', 'b': 'old-b', 'c': 'old-c'}])
+        self._write(table, [{'id': 1, 'a': 'new', 'b': None, 'c': None}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'a': 'new', 'b': None, 'c': None}],
+        )
+
+    # -- engines we don't support yet ------------------------------------
+
+    def test_aggregation_engine_raises_not_implemented(self):
+        """Until ``aggregation`` is ported, reading an aggregation table
+        must raise rather than silently produce dedupe results."""
+        table = self._create_pk_table('agg_unsupported',
+                                      merge_engine='aggregation')
+        self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}])
+        self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}])
+
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        with self.assertRaises(NotImplementedError) as cm:
+            rb.new_read().to_arrow(splits)
+        self.assertIn('aggregation', str(cm.exception))
+
+    def test_first_row_engine_raises_not_implemented(self):
+        """Until ``first-row`` is ported, reading a first-row table must
+        raise rather than silently produce dedupe results."""
+        table = self._create_pk_table('first_row_unsupported',
+                                      merge_engine='first-row')
+        self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}])
+        self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}])
+
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        with self.assertRaises(NotImplementedError) as cm:
+            rb.new_read().to_arrow(splits)
+        self.assertIn('first-row', str(cm.exception))
+
+    # -- partial-update + out-of-scope option combinations ---------------
+    #
+    # When a user pairs ``merge-engine: partial-update`` with any option
+    # this port doesn't implement (sequence-group, per-field aggregator
+    # override, ignore-delete, partial-update.remove-record-on-*), we
+    # must raise rather than silently run the simple last-non-null merge
+    # — otherwise we'd reproduce the same silent-corruption pattern this
+    # PR exists to close.
+
+    def _assert_partial_update_unsupported(self, table_name, extra_options,
+                                           expected_keys):
+        table = self._create_pk_table(
+            table_name, extra_options=extra_options)
+        self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}])
+        self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}])
+
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        with self.assertRaises(NotImplementedError) as cm:
+            rb.new_read().to_arrow(splits)
+        msg = str(cm.exception)
+        self.assertIn("partial-update", msg)
+        for key in expected_keys:
+            self.assertIn(key, msg,
+                          "expected option key '{}' in error: {}".format(key, 
msg))
+
+    def test_partial_update_with_sequence_group_raises(self):
+        self._assert_partial_update_unsupported(
+            'pu_seq_group',
+            {'fields.b.sequence-group': 'a'},
+            ['fields.b.sequence-group'],
+        )
+
+    def test_partial_update_with_field_aggregate_function_raises(self):
+        self._assert_partial_update_unsupported(
+            'pu_field_agg',
+            {'fields.a.aggregate-function': 'last_non_null_value'},
+            ['fields.a.aggregate-function'],
+        )
+
+    def test_partial_update_with_default_aggregate_function_raises(self):
+        self._assert_partial_update_unsupported(
+            'pu_default_agg',
+            {'fields.default-aggregate-function': 'last_non_null_value'},
+            ['fields.default-aggregate-function'],
+        )
+
+    def test_partial_update_with_ignore_delete_raises(self):
+        self._assert_partial_update_unsupported(
+            'pu_ignore_delete',
+            {'ignore-delete': 'true'},
+            ['ignore-delete'],
+        )
+
+    def test_partial_update_with_remove_record_on_delete_raises(self):
+        self._assert_partial_update_unsupported(
+            'pu_rrod',
+            {'partial-update.remove-record-on-delete': 'true'},
+            ['partial-update.remove-record-on-delete'],
+        )
+
+    def test_partial_update_with_remove_record_on_sequence_group_raises(self):
+        self._assert_partial_update_unsupported(
+            'pu_rrosg',
+            {'partial-update.remove-record-on-sequence-group': 'true'},
+            ['partial-update.remove-record-on-sequence-group'],
+        )
+
+    def 
test_partial_update_unsupported_options_guard_covers_raw_convertible(self):
+        """The unsupported-options guard must fire even when the scan
+        would dispatch every split through ``RawFileSplitRead`` (i.e. a
+        single-snapshot table where rows don't overlap).
+
+        Before the guard moved to ``TableRead.__init__`` this case
+        silently bypassed validation because raw-convertible splits skip
+        ``MergeFileSplitRead`` entirely — and an option like
+        ``partial-update.remove-record-on-delete`` would be ignored on
+        the read path while the user assumed it was honoured.
+        """
+        table = self._create_pk_table(
+            'pu_rrod_raw_convertible',
+            extra_options={'partial-update.remove-record-on-delete': 'true'},
+        )
+        # Single write -> single snapshot -> splits are raw-convertible.
+        self._write(table, [
+            {'id': 1, 'a': 'A', 'b': None, 'c': None},
+            {'id': 2, 'a': 'B', 'b': None, 'c': None},
+        ])
+        rb = table.new_read_builder()
+        with self.assertRaises(NotImplementedError) as cm:
+            rb.new_read()
+        msg = str(cm.exception)
+        self.assertIn('partial-update', msg)
+        self.assertIn('partial-update.remove-record-on-delete', msg)
+
+    def 
test_partial_update_with_explicit_ignore_delete_false_does_not_raise(self):
+        """Explicitly setting ignore-delete=false is equivalent to leaving
+        it unset and must not trip the guard."""
+        table = self._create_pk_table(
+            'pu_ignore_delete_false',
+            extra_options={'ignore-delete': 'false'},
+        )
+        self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}])
+        self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}],
+        )
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_partial_update_merge_function.py 
b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py
new file mode 100644
index 0000000000..60dfc7198d
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py
@@ -0,0 +1,226 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Direct unit tests for ``PartialUpdateMergeFunction``.
+
+Drives the merge function with synthetic ``KeyValue`` instances so the
+contract is pinned down without going through the full read pipeline.
+The end-to-end behaviour on real PK tables is exercised separately in
+``test_partial_update_e2e.py``.
+"""
+
+import unittest
+
+from pypaimon.read.reader.partial_update_merge_function import \
+    PartialUpdateMergeFunction
+from pypaimon.table.row.key_value import KeyValue
+from pypaimon.table.row.row_kind import RowKind
+
+
+def _kv(key, seq, row_kind, value):
+    """Build a fresh KeyValue for a (key, sequence, row_kind, value) tuple.
+
+    ``key`` and ``value`` are tuples of primitives — the helper handles
+    layout (key, seq, row_kind_byte, value) so individual tests can stay
+    focused on the merge semantics.
+    """
+    kv = KeyValue(key_arity=len(key), value_arity=len(value))
+    kv.replace(tuple(key) + (seq, row_kind.value) + tuple(value))
+    return kv
+
+
+def _result_value(kv):
+    """Extract the value tuple out of a KeyValue produced by get_result()."""
+    return tuple(kv.value.get_field(i) for i in range(kv.value_arity))
+
+
+def _result_key(kv):
+    return tuple(kv.key.get_field(i) for i in range(kv.key_arity))
+
+
+class PartialUpdateMergeFunctionTest(unittest.TestCase):
+
+    def test_single_insert_returns_value(self):
+        mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+        mf.reset()
+        mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x')))
+        result = mf.get_result()
+
+        self.assertIsNotNone(result)
+        self.assertEqual(_result_key(result), (1,))
+        self.assertEqual(_result_value(result), ('a', 'x'))
+        self.assertEqual(result.sequence_number, 100)
+        self.assertEqual(result.value_row_kind_byte, RowKind.INSERT.value)
+
+    def test_second_insert_overwrites_non_null(self):
+        mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+        mf.reset()
+        mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None)))
+        mf.add(_kv((1,), 101, RowKind.INSERT, ('b', None)))
+
+        result = mf.get_result()
+        self.assertEqual(_result_value(result), ('b', None))
+        # Sequence number tracks the latest add().
+        self.assertEqual(result.sequence_number, 101)
+
+    def test_second_insert_fills_in_null(self):
+        mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+        mf.reset()
+        mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None)))
+        mf.add(_kv((1,), 101, RowKind.INSERT, (None, 'x')))
+
+        result = mf.get_result()
+        self.assertEqual(_result_value(result), ('a', 'x'))
+
+    def test_third_insert_continues_merge(self):
+        mf = PartialUpdateMergeFunction(key_arity=1, value_arity=3)
+        mf.reset()
+        mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None, None)))
+        mf.add(_kv((1,), 101, RowKind.INSERT, (None, 'b', None)))
+        mf.add(_kv((1,), 102, RowKind.INSERT, (None, None, 'c')))
+
+        result = mf.get_result()
+        self.assertEqual(_result_value(result), ('a', 'b', 'c'))
+
+    def test_later_null_does_not_clobber_earlier_value(self):
+        mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+        mf.reset()
+        mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x')))
+        mf.add(_kv((1,), 101, RowKind.INSERT, (None, None)))
+
+        result = mf.get_result()
+        self.assertEqual(_result_value(result), ('a', 'x'))
+
+    def test_reset_between_keys(self):
+        mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+
+        mf.reset()
+        mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x')))
+        first = mf.get_result()
+        self.assertEqual(_result_key(first), (1,))
+        self.assertEqual(_result_value(first), ('a', 'x'))
+
+        mf.reset()
+        mf.add(_kv((2,), 200, RowKind.INSERT, ('b', 'y')))
+        second = mf.get_result()
+        self.assertEqual(_result_key(second), (2,))
+        self.assertEqual(_result_value(second), ('b', 'y'))
+
+    def test_get_result_before_any_add_returns_none(self):
+        mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+        mf.reset()
+        self.assertIsNone(mf.get_result())
+
+    def test_update_after_is_treated_as_insert(self):
+        # Java's PartialUpdate accepts UPDATE_AFTER alongside INSERT in
+        # non-sequence-group mode (both are "add" kinds). Mirror that.
+        mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+        mf.reset()
+        mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None)))
+        mf.add(_kv((1,), 101, RowKind.UPDATE_AFTER, (None, 'x')))
+
+        result = mf.get_result()
+        self.assertEqual(_result_value(result), ('a', 'x'))
+
+    def test_delete_row_raises_not_implemented(self):
+        mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+        mf.reset()
+        mf.add(_kv((1,), 100, RowKind.INSERT, ('a', 'x')))
+        with self.assertRaises(NotImplementedError) as cm:
+            mf.add(_kv((1,), 101, RowKind.DELETE, (None, None)))
+        self.assertIn('DELETE', str(cm.exception))
+        self.assertIn('ignore-delete', str(cm.exception))
+
+    def test_update_before_row_raises_not_implemented(self):
+        mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+        mf.reset()
+        with self.assertRaises(NotImplementedError) as cm:
+            mf.add(_kv((1,), 100, RowKind.UPDATE_BEFORE, (None, None)))
+        self.assertIn('UPDATE_BEFORE', str(cm.exception))
+
+    def test_result_is_decoupled_from_input_kv(self):
+        """The merge function must build a fresh result tuple — upstream
+        readers reuse a single KeyValue instance and call ``replace`` on
+        each iteration, so holding a reference to the input is unsafe.
+        """
+        mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+        mf.reset()
+        kv = _kv((1,), 100, RowKind.INSERT, ('a', 'x'))
+        mf.add(kv)
+        result = mf.get_result()
+
+        # Mutate the input's underlying tuple to simulate a reused
+        # KeyValue being rebound to a different row.
+        kv.replace((999, 999, RowKind.INSERT.value, 'evil', 'evil'))
+
+        # The previously-returned result must NOT be affected.
+        self.assertEqual(_result_key(result), (1,))
+        self.assertEqual(_result_value(result), ('a', 'x'))
+
+    # -- NOT-NULL input validation (mirrors Java's updateNonNullFields) ----
+
+    def test_first_insert_with_null_for_not_null_field_raises(self):
+        """If the very first row writes null to a NOT NULL field, raise —
+        same input-validation Java does in updateNonNullFields()."""
+        mf = PartialUpdateMergeFunction(
+            key_arity=1, value_arity=2, nullables=[True, False])
+        mf.reset()
+        with self.assertRaises(ValueError) as cm:
+            mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None)))
+        self.assertIn("Field 1", str(cm.exception))
+
+    def test_subsequent_insert_with_null_for_not_null_field_raises(self):
+        """A later null on a NOT NULL field must also raise — Java checks
+        on every add(), not just the first one."""
+        mf = PartialUpdateMergeFunction(
+            key_arity=1, value_arity=2, nullables=[True, False])
+        mf.reset()
+        mf.add(_kv((1,), 1, RowKind.INSERT, ('a', 'x')))
+        with self.assertRaises(ValueError) as cm:
+            mf.add(_kv((1,), 2, RowKind.INSERT, (None, None)))
+        self.assertIn("Field 1", str(cm.exception))
+
+    def test_null_for_nullable_field_is_absorbed(self):
+        """A null input on a nullable field is silently absorbed (existing
+        accumulator value wins) — the standard partial-update semantic."""
+        mf = PartialUpdateMergeFunction(
+            key_arity=1, value_arity=2, nullables=[True, True])
+        mf.reset()
+        mf.add(_kv((1,), 1, RowKind.INSERT, ('a', 'x')))
+        mf.add(_kv((1,), 2, RowKind.INSERT, (None, 'y')))
+        result = mf.get_result()
+        self.assertEqual(_result_value(result), ('a', 'y'))
+
+    def test_nullables_length_mismatch_raises(self):
+        with self.assertRaises(ValueError):
+            PartialUpdateMergeFunction(
+                key_arity=1, value_arity=2, nullables=[True])
+
+    def test_no_nullables_arg_skips_check(self):
+        """Backward-compat: callers that don't pass ``nullables`` get the
+        previous behaviour (no NOT-NULL validation)."""
+        mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
+        mf.reset()
+        # Would have raised had we declared the second field NOT NULL.
+        mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None)))
+        result = mf.get_result()
+        self.assertEqual(_result_value(result), ('a', None))
+
+
+if __name__ == '__main__':
+    unittest.main()


Reply via email to