This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fbab26b109 [python] In-memory merge buffer for primary-key writer 
(#7759)
fbab26b109 is described below

commit fbab26b1090192d4e467855e90b85ffc576da03a
Author: chaoyang <[email protected]>
AuthorDate: Thu Jun 4 13:50:51 2026 +0800

    [python] In-memory merge buffer for primary-key writer (#7759)
---
 .../pypaimon/common/merge_engine_dispatch.py       | 167 ++++++++++
 ...e_function.py => deduplicate_merge_function.py} |  40 +--
 .../read/reader/first_row_merge_function.py        |   7 +-
 .../read/reader/partial_update_merge_function.py   |  83 +++--
 .../pypaimon/read/reader/sort_merge_reader.py      |  18 +-
 paimon-python/pypaimon/read/split_read.py          |  53 ++-
 paimon-python/pypaimon/table/row/key_value.py      |  14 +
 .../pypaimon/tests/reader_primary_key_test.py      |  13 +-
 paimon-python/pypaimon/tests/test_first_row_e2e.py |  46 +++
 .../tests/test_first_row_merge_function.py         |  16 +
 .../pypaimon/tests/test_merge_engine_dispatch.py   | 135 ++++++++
 .../pypaimon/tests/test_partial_update_e2e.py      | 137 +++++++-
 .../tests/test_partial_update_merge_function.py    |  44 ++-
 .../pypaimon/tests/test_sequence_field_read.py     |   7 +-
 .../pypaimon/tests/test_write_merge_buffer.py      | 366 +++++++++++++++++++++
 paimon-python/pypaimon/write/file_store_write.py   | 112 ++++++-
 .../pypaimon/write/writer/key_value_data_writer.py | 223 ++++++++++++-
 17 files changed, 1337 insertions(+), 144 deletions(-)

diff --git a/paimon-python/pypaimon/common/merge_engine_dispatch.py 
b/paimon-python/pypaimon/common/merge_engine_dispatch.py
new file mode 100644
index 0000000000..a9096fd492
--- /dev/null
+++ b/paimon-python/pypaimon/common/merge_engine_dispatch.py
@@ -0,0 +1,167 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Centralised merge-engine dispatch.
+
+Both the read path (``MergeFileSplitRead``) and the write path
+(``KeyValueDataWriter``'s in-memory merge buffer) need to pick a
+``MergeFunction`` based on the table's ``merge-engine`` option. This
+module is the single source of truth so the two sides cannot drift.
+"""
+
+from typing import List, Optional
+
+from pypaimon.common.options.core_options import MergeEngine
+from pypaimon.read.reader.deduplicate_merge_function import \
+    DeduplicateMergeFunction
+from pypaimon.read.reader.first_row_merge_function import \
+    FirstRowMergeFunction
+from pypaimon.read.reader.partial_update_merge_function import \
+    PartialUpdateMergeFunction
+
+
+# Boolean-valued options that, when truthy, opt the table into
+# behaviour the pypaimon PartialUpdateMergeFunction does not yet
+# implement. Setting any of these forces the dispatch to refuse the
+# write instead of running the simple last-non-null merge silently.
+_PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS = (
+    "ignore-delete",
+    "partial-update.ignore-delete",
+    "first-row.ignore-delete",
+    "deduplicate.ignore-delete",
+    "partial-update.remove-record-on-delete",
+    "partial-update.remove-record-on-sequence-group",
+)
+_FIELDS_PREFIX = "fields."
+_FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group"
+_FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function"
+_DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function"
+
+# Mirror ``CoreOptions.ignore_delete()``: any of these keys, if set to
+# ``"true"``, opts the engine into silently dropping
+# DELETE/UPDATE_BEFORE records. Kept as a raw-option lookup here so the
+# dispatch stays table-agnostic.
+_IGNORE_DELETE_KEYS = (
+    "ignore-delete",
+    "first-row.ignore-delete",
+    "deduplicate.ignore-delete",
+    "partial-update.ignore-delete",
+)
+
+
+def build_merge_function(
+    *,
+    engine: MergeEngine,
+    raw_options: dict,
+    key_arity: int,
+    value_arity: int,
+    value_field_nullables: List[bool],
+    value_field_names: Optional[List[str]] = None,
+):
+    """Pick the MergeFunction for the table's ``merge-engine`` option.
+
+    ``engine`` and ``raw_options`` come from the table's ``CoreOptions``
+    (typically ``table.options.merge_engine()`` and
+    ``table.options.options.to_map()``). ``key_arity`` / ``value_arity``
+    / ``value_field_nullables`` describe the value-side schema the
+    caller wants the merge function to operate on -- for the read path
+    this is the projected read schema, for the write path it's the full
+    table schema (minus primary keys).
+
+    ``value_field_names`` is optional and only used by
+    ``PartialUpdateMergeFunction`` to surface the offending field name
+    when a NOT NULL constraint is violated; pass ``None`` if the caller
+    doesn't have names handy.
+    """
+    if engine == MergeEngine.DEDUPLICATE:
+        return DeduplicateMergeFunction()
+    if engine == MergeEngine.PARTIAL_UPDATE:
+        unsupported = partial_update_unsupported_options(raw_options)
+        if unsupported:
+            raise NotImplementedError(
+                "merge-engine 'partial-update' is enabled together with "
+                "options that pypaimon does not yet implement: {}. The "
+                "supported subset is per-key last-non-null merge with "
+                "no sequence-group, no per-field aggregator override, "
+                "no ignore-delete and no partial-update.remove-record-on-* "
+                "flags. Open an issue to track Python support.".format(
+                    ", ".join(sorted(unsupported))
+                )
+            )
+        return PartialUpdateMergeFunction(
+            key_arity=key_arity,
+            value_arity=value_arity,
+            nullables=list(value_field_nullables),
+            value_field_names=(
+                list(value_field_names)
+                if value_field_names is not None else None),
+        )
+    if engine == MergeEngine.FIRST_ROW:
+        return FirstRowMergeFunction(
+            ignore_delete=_ignore_delete_from_options(raw_options),
+        )
+    raise NotImplementedError(
+        "merge-engine '{}' is not implemented in pypaimon yet "
+        "(supported: deduplicate, first-row, partial-update). Open an "
+        "issue to track support.".format(engine.value)
+    )
+
+
+def _ignore_delete_from_options(raw_options: dict) -> bool:
+    for key in _IGNORE_DELETE_KEYS:
+        val = raw_options.get(key)
+        if val is not None:
+            return _option_is_truthy(val)
+    return False
+
+
+def partial_update_unsupported_options(raw_options: dict):
+    """Return the set of option keys this table sets that
+    ``PartialUpdateMergeFunction`` does not yet support. Empty set
+    means we can safely run the simple last-non-null merge.
+    """
+    flagged = set()
+    for key, value in raw_options.items():
+        if (key in _PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS
+                and _option_is_truthy(value)):
+            flagged.add(key)
+        elif key == _DEFAULT_AGGREGATE_FUNCTION_KEY:
+            flagged.add(key)
+        elif key.startswith(_FIELDS_PREFIX) and (
+                key.endswith(_FIELD_SEQUENCE_GROUP_SUFFIX)
+                or key.endswith(_FIELD_AGGREGATE_FUNCTION_SUFFIX)):
+            flagged.add(key)
+    return flagged
+
+
+def _option_is_truthy(raw):
+    """Strict ``"true"`` boolean parsing for table-option strings.
+
+    A string is truthy iff it equals ``"true"`` (case-insensitive).
+    ``"yes"``, ``"on"``, ``"1"`` and similar Python-truthy strings are
+    treated as falsey, matching the table-option parser used elsewhere
+    in Paimon so an option string the rest of the toolchain treats as
+    ``false`` is not silently elevated to ``true`` here.
+    """
+    if raw is None:
+        return False
+    if isinstance(raw, bool):
+        return raw
+    if isinstance(raw, str):
+        return raw.strip().lower() == "true"
+    return False
diff --git a/paimon-python/pypaimon/read/reader/first_row_merge_function.py 
b/paimon-python/pypaimon/read/reader/deduplicate_merge_function.py
similarity index 60%
copy from paimon-python/pypaimon/read/reader/first_row_merge_function.py
copy to paimon-python/pypaimon/read/reader/deduplicate_merge_function.py
index 162d608a64..5b669aae55 100644
--- a/paimon-python/pypaimon/read/reader/first_row_merge_function.py
+++ b/paimon-python/pypaimon/read/reader/deduplicate_merge_function.py
@@ -16,35 +16,35 @@
 # limitations under the License.
 
################################################################################
 
+"""Default merge function for primary-key tables.
+
+Mirrors Java ``DeduplicateMergeFunction`` -- for a run of KVs sharing
+the same primary key, keep only the one with the highest sequence
+number (by virtue of ``add`` being called in sequence-number order).
+"""
+
 from typing import Optional
 
 from pypaimon.table.row.key_value import KeyValue
 
 
-class FirstRowMergeFunction:
-    """A MergeFunction where key is primary key (unique) and value is the
-    full record, only keep the first one."""
+class DeduplicateMergeFunction:
+    """Keep only the latest KV per primary key.
+
+    Used by both the read path (``SortMergeReaderWithMinHeap``) and the
+    write path (``KeyValueDataWriter`` in-memory merge buffer) -- the
+    latter is what enforces the LSM "PK unique within a file"
+    invariant on flush.
+    """
 
-    def __init__(self, ignore_delete: bool = False):
-        self.ignore_delete = ignore_delete
-        self.first: Optional[KeyValue] = None
+    def __init__(self):
+        self.latest_kv: Optional[KeyValue] = None
 
     def reset(self) -> None:
-        self.first = None
+        self.latest_kv = None
 
     def add(self, kv: KeyValue) -> None:
-        if not kv.is_add():
-            if self.ignore_delete:
-                return
-            raise ValueError(
-                "By default, First row merge engine can not accept "
-                "DELETE/UPDATE_BEFORE records.\n"
-                "You can config 'ignore-delete' to ignore the "
-                "DELETE/UPDATE_BEFORE records."
-            )
-
-        if self.first is None:
-            self.first = kv
+        self.latest_kv = kv
 
     def get_result(self) -> Optional[KeyValue]:
-        return self.first
+        return self.latest_kv
diff --git a/paimon-python/pypaimon/read/reader/first_row_merge_function.py 
b/paimon-python/pypaimon/read/reader/first_row_merge_function.py
index 162d608a64..f3a91aadcf 100644
--- a/paimon-python/pypaimon/read/reader/first_row_merge_function.py
+++ b/paimon-python/pypaimon/read/reader/first_row_merge_function.py
@@ -44,7 +44,12 @@ class FirstRowMergeFunction:
             )
 
         if self.first is None:
-            self.first = kv
+            # Snapshot, don't keep the reference: the caller may pool/reuse
+            # a single KeyValue and replace() it for the next row (the write
+            # path's fold does exactly this). Holding the live reference
+            # would make get_result return the LAST row instead of the
+            # first, silently turning first-row into last-row.
+            self.first = kv.copy()
 
     def get_result(self) -> Optional[KeyValue]:
         return self.first
diff --git 
a/paimon-python/pypaimon/read/reader/partial_update_merge_function.py 
b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py
index 978b48011c..6fc2d45311 100644
--- a/paimon-python/pypaimon/read/reader/partial_update_merge_function.py
+++ b/paimon-python/pypaimon/read/reader/partial_update_merge_function.py
@@ -16,26 +16,22 @@
 # limitations under the License.
 
################################################################################
 
-"""
-Python port of Java's ``PartialUpdateMergeFunction``
-(``paimon-core/src/main/java/org/apache/paimon/mergetree/compact/
-PartialUpdateMergeFunction.java``).
+"""Merge function for the ``partial-update`` merge engine on PK tables.
 
-The merge function used by the ``partial-update`` merge engine on PK
-tables: rows sharing a primary key are merged left-to-right, taking the
-latest non-null value per non-PK field. ``DeduplicateMergeFunction``
-keeps only the latest row; ``PartialUpdateMergeFunction`` instead lets
-later writes "fill in" fields the earlier writes left null, so users
-can write the same logical record across multiple commits with
-different sets of non-null columns.
+Rows sharing a primary key are merged left-to-right, taking the latest
+non-null value per non-PK field. ``DeduplicateMergeFunction`` keeps
+only the latest row; ``PartialUpdateMergeFunction`` instead lets later
+writes "fill in" fields the earlier writes left null, so users can
+write the same logical record across multiple commits with different
+sets of non-null columns.
 
-This is the **core merge semantics only**. The Java implementation also
+This is the **core merge semantics only**. The upstream engine also
 supports per-field aggregator overrides (``fields.<name>.aggregate-
 function``), sequence groups (``fields.<name>.sequence-group``),
 ``ignore-delete``, and ``partial-update.remove-record-on-*`` options.
-None of those are implemented yet; non-INSERT row kinds raise
-``NotImplementedError`` at ``add`` time so we never silently corrupt
-data with a half-implemented contract.
+None of those are implemented in pypaimon yet; non-INSERT row kinds
+raise ``NotImplementedError`` at ``add`` time so we never silently
+corrupt data with a half-implemented contract.
 """
 
 from typing import Any, List, Optional
@@ -56,19 +52,31 @@ class PartialUpdateMergeFunction:
     """
 
     def __init__(self, key_arity: int, value_arity: int,
-                 nullables: Optional[List[bool]] = None):
+                 nullables: Optional[List[bool]] = None,
+                 value_field_names: Optional[List[str]] = None):
         self._key_arity = key_arity
         self._value_arity = value_arity
         # Per-value-field nullable flags, parallel to value indices. When
         # ``None``, no nullability check runs (preserves the contract for
         # direct callers that don't have schema info handy). When given,
-        # mirrors Java's ``updateNonNullFields`` check: a null input on a
-        # NOT NULL field raises rather than being silently absorbed.
+        # the schema's NOT NULL declaration is enforced on every add():
+        # a null input on a NOT NULL field raises rather than being
+        # silently absorbed.
         if nullables is not None and len(nullables) != value_arity:
             raise ValueError(
                 "nullables length {} does not match value_arity {}".format(
                     len(nullables), value_arity))
         self._nullables = nullables
+        # Optional value-field names, parallel to value indices. When
+        # given, the NOT-NULL error message uses the field name instead
+        # of a bare position to make the failure actionable.
+        if value_field_names is not None \
+                and len(value_field_names) != value_arity:
+            raise ValueError(
+                "value_field_names length {} does not match "
+                "value_arity {}".format(
+                    len(value_field_names), value_arity))
+        self._value_field_names = value_field_names
         # Lazily allocated on first add(); ``None`` means "no rows yet".
         self._accumulator: Optional[List[Any]] = None
         # Reference to the most recently added kv. We use it only to
@@ -84,23 +92,24 @@ class PartialUpdateMergeFunction:
     def add(self, kv: KeyValue) -> None:
         row_kind_byte = kv.value_row_kind_byte
         if not RowKind.is_add_byte(row_kind_byte):
-            # DELETE / UPDATE_BEFORE need ignore-delete or
-            # partial-update.remove-record-on-delete to be set in Java;
-            # neither option is wired up in pypaimon yet, so refuse the
-            # row rather than silently swallow it.
+            # DELETE / UPDATE_BEFORE require ignore-delete or
+            # partial-update.remove-record-on-delete to be enabled,
+            # and neither option is implemented in pypaimon yet, so
+            # refuse the row rather than silently swallow it.
             raise NotImplementedError(
-                "PartialUpdateMergeFunction received a {} row; this "
-                "Python port does not yet implement the ignore-delete / "
-                "partial-update.remove-record-on-delete options. Use the "
-                "Java client for tables that produce DELETE / "
-                "UPDATE_BEFORE 
rows.".format(RowKind(row_kind_byte).to_string())
+                "PartialUpdateMergeFunction received a {} row; the "
+                "ignore-delete / partial-update.remove-record-on-delete "
+                "options needed to handle it are not yet implemented in "
+                "pypaimon. Tables that produce DELETE / UPDATE_BEFORE "
+                "rows are not supported here.".format(
+                    RowKind(row_kind_byte).to_string())
             )
 
-        # Mirror Java's reset() + updateNonNullFields(): the accumulator
-        # starts as all-null (equivalent to ``new GenericRow(arity)``) and
-        # each add() writes non-null inputs; null inputs are absorbed —
-        # except when the schema marks the field NOT NULL, in which case
-        # we raise to match Java's IllegalArgumentException check.
+        # The accumulator starts as all-null and each add() writes
+        # non-null inputs; null inputs are absorbed -- except when the
+        # schema marks the field NOT NULL, in which case we raise so
+        # the violation surfaces at write time instead of producing a
+        # row that breaks the schema invariant.
         if self._accumulator is None:
             self._accumulator = [None] * self._value_arity
         for i in range(self._value_arity):
@@ -108,7 +117,15 @@ class PartialUpdateMergeFunction:
             if v is not None:
                 self._accumulator[i] = v
             elif self._nullables is not None and not self._nullables[i]:
-                raise ValueError("Field {} can not be null".format(i))
+                if self._value_field_names is not None:
+                    field_ref = "'{}'".format(self._value_field_names[i])
+                else:
+                    field_ref = "at index {}".format(i)
+                raise ValueError(
+                    "Partial-update received NULL for non-nullable field "
+                    "{}. Declare the field nullable in the table schema "
+                    "if writes can leave it unset, or supply a value."
+                    .format(field_ref))
         self._latest_kv = kv
 
     def get_result(self) -> Optional[KeyValue]:
diff --git a/paimon-python/pypaimon/read/reader/sort_merge_reader.py 
b/paimon-python/pypaimon/read/reader/sort_merge_reader.py
index 1597745bbd..c525a7592c 100644
--- a/paimon-python/pypaimon/read/reader/sort_merge_reader.py
+++ b/paimon-python/pypaimon/read/reader/sort_merge_reader.py
@@ -18,6 +18,8 @@
 import heapq
 from typing import Any, Callable, List, Optional
 
+from pypaimon.read.reader.deduplicate_merge_function import \
+    DeduplicateMergeFunction
 from pypaimon.read.reader.iface.record_iterator import RecordIterator
 from pypaimon.read.reader.iface.record_reader import RecordReader
 from pypaimon.schema.data_types import AtomicType, DataField, Keyword
@@ -140,22 +142,6 @@ class SortMergeIterator(RecordIterator):
         return True
 
 
-class DeduplicateMergeFunction:
-    """A MergeFunction where key is primary key (unique) and value is the full 
record, only keep the latest one."""
-
-    def __init__(self):
-        self.latest_kv = None
-
-    def reset(self) -> None:
-        self.latest_kv = None
-
-    def add(self, kv: KeyValue):
-        self.latest_kv = kv
-
-    def get_result(self) -> Optional[KeyValue]:
-        return self.latest_kv
-
-
 class Element:
     def __init__(self, kv: KeyValue, iterator: RecordIterator[KeyValue], 
reader: RecordReader[KeyValue]):
         self.kv = kv
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 7d20359dfc..5a78cb07be 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -20,6 +20,7 @@ from abc import ABC, abstractmethod
 from functools import partial
 from typing import Callable, Dict, List, Optional, Tuple
 
+from pypaimon.common.merge_engine_dispatch import build_merge_function
 from pypaimon.common.options.core_options import CoreOptions, MergeEngine
 from pypaimon.common.predicate import Predicate
 from pypaimon.deletionvectors import ApplyDeletionVectorReader
@@ -59,12 +60,7 @@ from pypaimon.read.reader.key_value_wrap_reader import 
KeyValueWrapReader
 from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
 from pypaimon.read.reader.aggregation_merge_function import (
     AggregateMergeFunction, build_field_aggregators)
-from pypaimon.read.reader.partial_update_merge_function import \
-    PartialUpdateMergeFunction
-from pypaimon.read.reader.first_row_merge_function import \
-    FirstRowMergeFunction
-from pypaimon.read.reader.sort_merge_reader import (DeduplicateMergeFunction,
-                                                    SortMergeReaderWithMinHeap,
+from pypaimon.read.reader.sort_merge_reader import (SortMergeReaderWithMinHeap,
                                                     builtin_seq_comparator)
 from pypaimon.read.push_down_utils import _get_all_fields
 from pypaimon.read.split import Split
@@ -125,7 +121,7 @@ class SplitRead(ABC):
         self.limit = limit
         # Snapshot the raw value-side schema before _create_key_value_fields
         # wraps it, so MergeFileSplitRead can hand per-value-field nullable
-        # flags to merge functions that mirror Java's NOT-NULL check.
+        # flags to merge functions that enforce NOT-NULL on every add().
         self.value_fields = list(read_type)
 
         self.trimmed_primary_key = self.table.trimmed_primary_keys
@@ -703,28 +699,20 @@ class MergeFileSplitRead(SplitRead):
             seq_comparator=self.seq_comparator)
 
     def _build_merge_function(self):
-        """Pick the right MergeFunction implementation for the table's
-        ``merge-engine`` option.
-
-        The pre-flight checks that reject unsupported engines or option
-        combinations live in
-        :func:`pypaimon.read.merge_engine_support.check_supported` and
-        run at ``TableRead.__init__`` time, so by the point this method
-        executes only the supported engines are reachable.
+        """Pick the MergeFunction for the table's ``merge-engine`` option.
+
+        Delegates to the shared dispatch in
+        ``pypaimon.common.merge_engine_dispatch`` so the read path and
+        the in-memory merge buffer on the write path cannot drift.
+        ``AGGREGATE`` is special-cased here because building the per-
+        field aggregators needs the full ``DataField`` objects, the
+        full primary-key list and the parsed ``CoreOptions`` -- which
+        sit outside the dispatch's raw-options contract. The writer-
+        side merge buffer falls back to dedupe for aggregation anyway
+        (see :meth:`FileStoreWrite._build_pk_merge_function`), so the
+        two sides only need to share the simple engines.
         """
         engine = self.table.options.merge_engine()
-        if engine == MergeEngine.DEDUPLICATE:
-            return DeduplicateMergeFunction()
-        if engine == MergeEngine.PARTIAL_UPDATE:
-            return PartialUpdateMergeFunction(
-                key_arity=len(self.trimmed_primary_key),
-                value_arity=self.value_arity,
-                nullables=[f.type.nullable for f in self.value_fields],
-            )
-        if engine == MergeEngine.FIRST_ROW:
-            return FirstRowMergeFunction(
-                ignore_delete=self.table.options.ignore_delete(),
-            )
         if engine == MergeEngine.AGGREGATE:
             # Use the full primary-key list, not ``trimmed_primary_key``:
             # ``value_fields`` still carries partition columns, so any PK
@@ -742,10 +730,13 @@ class MergeFileSplitRead(SplitRead):
                 value_arity=self.value_arity,
                 field_aggregators=field_aggregators,
             )
-        # check_supported() rejects everything else at TableRead.__init__.
-        raise AssertionError(
-            "unreachable: merge-engine '{}' should have been rejected by "
-            "merge_engine_support.check_supported".format(engine.value)
+        return build_merge_function(
+            engine=engine,
+            raw_options=self.table.options.options.to_map(),
+            key_arity=len(self.trimmed_primary_key),
+            value_arity=self.value_arity,
+            value_field_nullables=[f.type.nullable for f in self.value_fields],
+            value_field_names=[f.name for f in self.value_fields],
         )
 
     def create_reader(self) -> RecordReader:
diff --git a/paimon-python/pypaimon/table/row/key_value.py 
b/paimon-python/pypaimon/table/row/key_value.py
index 845f77fadc..8fc89ea8b8 100644
--- a/paimon-python/pypaimon/table/row/key_value.py
+++ b/paimon-python/pypaimon/table/row/key_value.py
@@ -36,6 +36,20 @@ class KeyValue:
         self._reused_value.replace(row_tuple)
         return self
 
+    def copy(self) -> 'KeyValue':
+        """Return an independent KeyValue carrying the current row tuple.
+
+        ``replace`` swaps the tuple reference rather than mutating it, so a
+        copy stays valid even if a pooled/reused source KeyValue is later
+        replaced again. Callers that need to hold onto a kv past the next
+        ``replace`` (e.g. FirstRowMergeFunction keeping the first row while
+        the writer folds with a single pooled KeyValue) use this.
+        """
+        new = KeyValue(self.key_arity, self.value_arity)
+        if self._row_tuple is not None:
+            new.replace(self._row_tuple)
+        return new
+
     def is_add(self) -> bool:
         return RowKind.is_add_byte(self.value_row_kind_byte)
 
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py 
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index 89a9e8a6a1..7cae0a77c7 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -224,12 +224,15 @@ class PkReaderTest(unittest.TestCase):
 
         read_builder = table.new_read_builder()
         actual = self._read_test_table(read_builder).sort_by('user_id')
-        # TODO support pk merge feature when multiple write
+        # The in-memory merge buffer in KeyValueDataWriter folds the
+        # two writes for user_id=2 down to the latest row before flush
+        # (default merge engine is deduplicate), so the PK appears once
+        # with the second batch's value.
         expected = pa.Table.from_pydict({
-            'user_id': [1, 2, 2, 3, 4, 5, 7, 8],
-            'item_id': [1001, 1002, 1002, 1003, 1004, 1005, 1007, 1008],
-            'behavior': ['a', 'b', 'b-new', 'c', None, 'e', 'g', 'h'],
-            'dt': ['p1', 'p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2'],
+            'user_id': [1, 2, 3, 4, 5, 7, 8],
+            'item_id': [1001, 1002, 1003, 1004, 1005, 1007, 1008],
+            'behavior': ['a', 'b-new', 'c', None, 'e', 'g', 'h'],
+            'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2'],
         }, schema=self.pa_schema)
         self.assertEqual(actual, expected)
 
diff --git a/paimon-python/pypaimon/tests/test_first_row_e2e.py 
b/paimon-python/pypaimon/tests/test_first_row_e2e.py
index 17a13c9aae..58776848b8 100644
--- a/paimon-python/pypaimon/tests/test_first_row_e2e.py
+++ b/paimon-python/pypaimon/tests/test_first_row_e2e.py
@@ -149,6 +149,52 @@ class FirstRowMergeEngineE2ETest(unittest.TestCase):
             ],
         )
 
+    def test_first_row_intra_batch_duplicate(self):
+        """A single write whose batch already contains duplicate PKs.
+
+        The whole batch is folded in one flush, so this exercises the
+        write-side fold rather than the cross-commit read merge. first-row
+        must keep the first occurrence of each PK.
+        """
+        table = self._create_pk_table('first_row_intra_batch')
+        self._write(table, [
+            {'id': 1, 'a': 'first', 'b': 'B1'},
+            {'id': 1, 'a': 'second', 'b': 'B2'},
+            {'id': 1, 'a': 'third', 'b': 'B3'},
+            {'id': 2, 'a': 'only', 'b': 'B'},
+        ])
+
+        self.assertEqual(
+            self._read(table),
+            [
+                {'id': 1, 'a': 'first', 'b': 'B1'},
+                {'id': 2, 'a': 'only', 'b': 'B'},
+            ],
+        )
+
+    def test_first_row_multiple_writes_one_commit(self):
+        """Several write_arrow calls committed once: the same PK across
+        those writes folds in a single flush. first-row keeps the first.
+        """
+        table = self._create_pk_table('first_row_multi_write_one_commit')
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        c = wb.new_commit()
+        try:
+            w.write_arrow(pa.Table.from_pylist(
+                [{'id': 1, 'a': 'first', 'b': 'B1'}], schema=self.pa_schema))
+            w.write_arrow(pa.Table.from_pylist(
+                [{'id': 1, 'a': 'second', 'b': 'B2'}], schema=self.pa_schema))
+            c.commit(w.prepare_commit())
+        finally:
+            w.close()
+            c.close()
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'a': 'first', 'b': 'B1'}],
+        )
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_first_row_merge_function.py 
b/paimon-python/pypaimon/tests/test_first_row_merge_function.py
index 7f411f85ff..5e56561984 100644
--- a/paimon-python/pypaimon/tests/test_first_row_merge_function.py
+++ b/paimon-python/pypaimon/tests/test_first_row_merge_function.py
@@ -64,6 +64,22 @@ class FirstRowMergeFunctionTest(unittest.TestCase):
         result = mf.get_result()
         self.assertEqual(_result_value(result), (10, "first"))
 
+    def test_keeps_first_row_when_kv_is_pooled(self):
+        # The writer's fold (KeyValueDataWriter._merge_pending_by_pk) reuses
+        # a single KeyValue and replace()s it per row. add() must snapshot
+        # the first row; otherwise get_result tracks the pooled kv's last
+        # replace() and returns the LAST row -- silently turning first-row
+        # into last-row. This is the case the per-row _kv() tests miss.
+        mf = FirstRowMergeFunction()
+        mf.reset()
+        pooled = KeyValue(key_arity=1, value_arity=2)
+        pooled.replace((1, 1, RowKind.INSERT.value, 10, "first"))
+        mf.add(pooled)
+        pooled.replace((1, 2, RowKind.INSERT.value, 20, "second"))
+        mf.add(pooled)
+        result = mf.get_result()
+        self.assertEqual(_result_value(result), (10, "first"))
+
     def test_reset_clears_state(self):
         mf = FirstRowMergeFunction()
         mf.reset()
diff --git a/paimon-python/pypaimon/tests/test_merge_engine_dispatch.py 
b/paimon-python/pypaimon/tests/test_merge_engine_dispatch.py
new file mode 100644
index 0000000000..f8e2d717a5
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_merge_engine_dispatch.py
@@ -0,0 +1,135 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Unit tests for ``pypaimon.common.merge_engine_dispatch``.
+
+Pins down the table-option parsing the dispatch uses to decide whether
+``partial-update`` should run or be rejected. The key contract: strict
+``"true"``-only boolean parsing aligned with the table-option parser
+used elsewhere in Paimon, so an option string the rest of the
+toolchain treats as ``false`` is not silently elevated to ``true``
+here.
+"""
+
+import unittest
+
+from pypaimon.common.merge_engine_dispatch import (
+    _option_is_truthy,
+    build_merge_function,
+    partial_update_unsupported_options,
+)
+from pypaimon.common.options.core_options import MergeEngine
+from pypaimon.read.reader.partial_update_merge_function import \
+    PartialUpdateMergeFunction
+
+
+class OptionIsTruthyTest(unittest.TestCase):
+    """``_option_is_truthy`` accepts only ``"true"`` (case-insensitive)
+    as truthy; every other string -- including ``"1"``, ``"yes"``,
+    ``"on"`` -- is falsey. Matches the table-option parser used
+    elsewhere in Paimon.
+    """
+
+    def test_true_string_is_truthy(self):
+        self.assertTrue(_option_is_truthy("true"))
+
+    def test_true_string_is_case_insensitive(self):
+        for v in ("TRUE", "True", "tRuE"):
+            self.assertTrue(_option_is_truthy(v), v)
+
+    def test_true_string_tolerates_surrounding_whitespace(self):
+        self.assertTrue(_option_is_truthy("  true  "))
+
+    def test_python_bool_true_is_truthy(self):
+        self.assertTrue(_option_is_truthy(True))
+
+    def test_python_bool_false_is_falsey(self):
+        self.assertFalse(_option_is_truthy(False))
+
+    def test_none_is_falsey(self):
+        self.assertFalse(_option_is_truthy(None))
+
+    def test_non_true_strings_are_falsey(self):
+        # The table-option parser elsewhere in Paimon returns false
+        # for every one of these. pypaimon must do the same so a
+        # user-set "yes" is not silently elevated to true here while
+        # the rest of the toolchain treats it as false.
+        for v in ("1", "yes", "on", "Yes", "ON", "y", "t", "0", "no", "off",
+                  "false", "FALSE", ""):
+            self.assertFalse(_option_is_truthy(v), v)
+
+
+class PartialUpdateUnsupportedOptionsTest(unittest.TestCase):
+
+    def test_ignore_delete_yes_is_not_flagged(self):
+        # ``yes`` is falsey under the upstream table-option parser,
+        # so partial-update must NOT be blocked here. Pre-fix pypaimon
+        # rejected this; the fix aligns the dispatch with the parser.
+        unsupported = partial_update_unsupported_options(
+            {"partial-update.ignore-delete": "yes"})
+        self.assertEqual(unsupported, set())
+
+    def test_ignore_delete_true_is_flagged(self):
+        unsupported = partial_update_unsupported_options(
+            {"partial-update.ignore-delete": "true"})
+        self.assertEqual(unsupported, {"partial-update.ignore-delete"})
+
+    def test_sequence_group_is_flagged(self):
+        unsupported = partial_update_unsupported_options(
+            {"fields.a.sequence-group": "b"})
+        self.assertEqual(unsupported, {"fields.a.sequence-group"})
+
+    def test_unrelated_options_are_not_flagged(self):
+        unsupported = partial_update_unsupported_options(
+            {"bucket": "1", "merge-engine": "partial-update"})
+        self.assertEqual(unsupported, set())
+
+
+class BuildMergeFunctionTest(unittest.TestCase):
+    """``build_merge_function`` forwards ``value_field_names`` to
+    ``PartialUpdateMergeFunction`` so the NOT-NULL error message can
+    surface the offending column name. This is the only behavioural
+    contract the dispatch adds on top of routing.
+    """
+
+    def test_partial_update_forwards_field_names(self):
+        mf = build_merge_function(
+            engine=MergeEngine.PARTIAL_UPDATE,
+            raw_options={},
+            key_arity=1,
+            value_arity=2,
+            value_field_nullables=[True, True],
+            value_field_names=['col_a', 'col_b'],
+        )
+        self.assertIsInstance(mf, PartialUpdateMergeFunction)
+        self.assertEqual(mf._value_field_names, ['col_a', 'col_b'])
+
+    def test_partial_update_without_field_names_keeps_none(self):
+        mf = build_merge_function(
+            engine=MergeEngine.PARTIAL_UPDATE,
+            raw_options={},
+            key_arity=1,
+            value_arity=2,
+            value_field_nullables=[True, True],
+        )
+        self.assertIsInstance(mf, PartialUpdateMergeFunction)
+        self.assertIsNone(mf._value_field_names)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py 
b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
index 8606d6c239..90fa5f711b 100644
--- a/paimon-python/pypaimon/tests/test_partial_update_e2e.py
+++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
@@ -88,6 +88,24 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
             w.close()
             c.close()
 
+    def _write_many(self, table, batches):
+        """Multiple ``write_arrow`` calls inside a single ``prepare_commit``.
+
+        Mirrors the reviewer's question: rows that land in the same
+        underlying data file must still go through the merge-engine
+        dispatch; in-writer merging cannot silently degrade to dedupe.
+        """
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        c = wb.new_commit()
+        try:
+            for rows in batches:
+                w.write_arrow(pa.Table.from_pylist(rows, 
schema=self.pa_schema))
+            c.commit(w.prepare_commit())
+        finally:
+            w.close()
+            c.close()
+
     def _read(self, table):
         rb = table.new_read_builder()
         splits = rb.new_scan().plan().splits()
@@ -173,6 +191,46 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
             [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}],
         )
 
+    # -- single-commit, multiple write_arrow calls -----------------------
+    #
+    # The in-memory merge buffer added to ``KeyValueDataWriter`` runs
+    # the merge function on flush, so rows from multiple ``write_arrow``
+    # calls that share a primary key are folded into a single row before
+    # the data file is written. The flushed file therefore satisfies the
+    # LSM "PK unique within a file" invariant the read-side
+    # ``raw_convertible`` fast path relies on.
+
+    def test_partial_update_two_write_arrows_single_commit(self):
+        """Two ``write_arrow`` calls + one ``prepare_commit``: each
+        carries a disjoint non-null field; result is the per-field merge.
+        """
+        table = self._create_pk_table('two_writes_single_commit')
+        self._write_many(table, [
+            [{'id': 1, 'a': 'A', 'b': None, 'c': None}],
+            [{'id': 1, 'a': None, 'b': 'B', 'c': None}],
+        ])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'a': 'A', 'b': 'B', 'c': None}],
+        )
+
+    def test_partial_update_three_write_arrows_single_commit(self):
+        """Three ``write_arrow`` calls in a single commit compose into
+        the union of non-null fields.
+        """
+        table = self._create_pk_table('three_writes_single_commit')
+        self._write_many(table, [
+            [{'id': 1, 'a': 'A', 'b': None, 'c': None}],
+            [{'id': 1, 'a': None, 'b': 'B', 'c': None}],
+            [{'id': 1, 'a': None, 'b': None, 'c': 'C'}],
+        ])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'a': 'A', 'b': 'B', 'c': 'C'}],
+        )
+
     # -- deduplicate (regression) ----------------------------------------
 
     def test_deduplicate_engine_unchanged(self):
@@ -188,10 +246,37 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
             [{'id': 1, 'a': 'new', 'b': None, 'c': None}],
         )
 
+    def test_deduplicate_two_write_arrows_single_commit(self):
+        """Pre-PR master silently returned both rows because the
+        flushed file held two records sharing a primary key. With the
+        in-memory merge buffer in place, ``deduplicate`` collapses
+        same-PK rows in a single commit too -- LSM "PK unique within a
+        file" invariant restored.
+        """
+        table = self._create_pk_table(
+            'dedupe_two_writes_single_commit',
+            merge_engine='deduplicate',
+        )
+        self._write_many(table, [
+            [{'id': 1, 'a': 'first', 'b': 'old', 'c': None}],
+            [{'id': 1, 'a': 'second', 'b': 'new', 'c': None}],
+        ])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'a': 'second', 'b': 'new', 'c': None}],
+        )
+
     # -- other supported engines (smoke) ---------------------------------
 
     def test_first_row_engine_keeps_first(self):
-        """The ``first-row`` engine must keep the earliest row per PK."""
+        """The ``first-row`` engine must keep the earliest row per PK.
+
+        Both the writer-side merge buffer and the reader-side merge
+        function go through ``merge_engine_dispatch``, so first-row is
+        a real supported engine (no dedupe fallback / no NotImplemented
+        raise) on both sides.
+        """
         table = self._create_pk_table('first_row_supported',
                                       merge_engine='first-row')
         self._write(table, [{'id': 1, 'a': 'first', 'b': None, 'c': None}])
@@ -202,6 +287,23 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
             [{'id': 1, 'a': 'first', 'b': None, 'c': None}],
         )
 
+    def test_aggregation_engine_write_logs_fallback_warning(self):
+        """The write-side fallback to deduplicate for unsupported engines
+        is silent in terms of return value -- a ``logging.warning`` is
+        the only signal that file contents will not match the table's
+        declared semantics. Important when the same table is read back
+        by a reader that honours the declared engine; the pypaimon
+        read-side raise wouldn't fire there.
+        """
+        table = self._create_pk_table('agg_warning',
+                                      merge_engine='aggregation')
+        with self.assertLogs(
+                'pypaimon.write.file_store_write', level='WARNING') as cm:
+            self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}])
+        combined = '\n'.join(cm.output)
+        self.assertIn('aggregation', combined)
+        self.assertIn('deduplicate', combined)
+
     # -- partial-update + out-of-scope option combinations ---------------
     #
     # When a user pairs ``merge-engine: partial-update`` with any option
@@ -213,15 +315,14 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
 
     def _assert_partial_update_unsupported(self, table_name, extra_options,
                                            expected_keys):
+        # Shared dispatch runs at write time too, so the unsupported-
+        # option error surfaces inside the first ``write_arrow`` call
+        # (when ``FileStoreWrite._create_data_writer`` first runs)
+        # rather than waiting for read.
         table = self._create_pk_table(
             table_name, extra_options=extra_options)
-        self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}])
-        self._write(table, [{'id': 1, 'a': None, 'b': 'B', 'c': None}])
-
-        rb = table.new_read_builder()
-        splits = rb.new_scan().plan().splits()
         with self.assertRaises(NotImplementedError) as cm:
-            rb.new_read().to_arrow(splits)
+            self._write(table, [{'id': 1, 'a': 'A', 'b': None, 'c': None}])
         msg = str(cm.exception)
         self.assertIn("partial-update", msg)
         for key in expected_keys:
@@ -271,25 +372,25 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
         )
 
     def 
test_partial_update_unsupported_options_guard_covers_raw_convertible(self):
-        """The unsupported-options guard must fire even when the scan
-        would dispatch every split through ``RawFileSplitRead`` (i.e. a
-        single-snapshot table where rows don't overlap).
+        """The read-side guard at ``TableRead.__init__`` must fire even
+        when the scan would dispatch every split through
+        ``RawFileSplitRead`` (single-snapshot, non-overlapping rows).
 
         Before the guard moved to ``TableRead.__init__`` this case
         silently bypassed validation because raw-convertible splits skip
-        ``MergeFileSplitRead`` entirely — and an option like
-        ``partial-update.remove-record-on-delete`` would be ignored on
-        the read path while the user assumed it was honoured.
+        ``MergeFileSplitRead`` entirely -- the read path's
+        ``_build_merge_function`` never ran, so an option like
+        ``partial-update.remove-record-on-delete`` was ignored on read.
+
+        The shared dispatch now also fires on the write path's first
+        flush (see ``_assert_partial_update_unsupported``), so we skip
+        ``_write`` here: the read-side guard runs at ``new_read()``
+        construction time regardless of whether data exists.
         """
         table = self._create_pk_table(
             'pu_rrod_raw_convertible',
             extra_options={'partial-update.remove-record-on-delete': 'true'},
         )
-        # Single write -> single snapshot -> splits are raw-convertible.
-        self._write(table, [
-            {'id': 1, 'a': 'A', 'b': None, 'c': None},
-            {'id': 2, 'a': 'B', 'b': None, 'c': None},
-        ])
         rb = table.new_read_builder()
         with self.assertRaises(NotImplementedError) as cm:
             rb.new_read()
diff --git a/paimon-python/pypaimon/tests/test_partial_update_merge_function.py 
b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py
index 60dfc7198d..b412fdd388 100644
--- a/paimon-python/pypaimon/tests/test_partial_update_merge_function.py
+++ b/paimon-python/pypaimon/tests/test_partial_update_merge_function.py
@@ -127,8 +127,8 @@ class PartialUpdateMergeFunctionTest(unittest.TestCase):
         self.assertIsNone(mf.get_result())
 
     def test_update_after_is_treated_as_insert(self):
-        # Java's PartialUpdate accepts UPDATE_AFTER alongside INSERT in
-        # non-sequence-group mode (both are "add" kinds). Mirror that.
+        # UPDATE_AFTER is treated as an "add" alongside INSERT in
+        # non-sequence-group mode, matching the upstream contract.
         mf = PartialUpdateMergeFunction(key_arity=1, value_arity=2)
         mf.reset()
         mf.add(_kv((1,), 100, RowKind.INSERT, ('a', None)))
@@ -172,28 +172,54 @@ class PartialUpdateMergeFunctionTest(unittest.TestCase):
         self.assertEqual(_result_key(result), (1,))
         self.assertEqual(_result_value(result), ('a', 'x'))
 
-    # -- NOT-NULL input validation (mirrors Java's updateNonNullFields) ----
+    # -- NOT-NULL input validation ----
 
     def test_first_insert_with_null_for_not_null_field_raises(self):
-        """If the very first row writes null to a NOT NULL field, raise —
-        same input-validation Java does in updateNonNullFields()."""
+        """If the very first row writes null to a NOT NULL field, raise --
+        the schema's NOT NULL declaration is enforced on every add()."""
         mf = PartialUpdateMergeFunction(
             key_arity=1, value_arity=2, nullables=[True, False])
         mf.reset()
         with self.assertRaises(ValueError) as cm:
             mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None)))
-        self.assertIn("Field 1", str(cm.exception))
+        msg = str(cm.exception)
+        # Without field names we fall back to the index, but the
+        # actionable hint must still be there.
+        self.assertIn("at index 1", msg)
+        self.assertIn("Declare the field nullable", msg)
 
     def test_subsequent_insert_with_null_for_not_null_field_raises(self):
-        """A later null on a NOT NULL field must also raise — Java checks
-        on every add(), not just the first one."""
+        """A later null on a NOT NULL field must also raise -- the
+        NOT NULL check fires on every add(), not just the first one."""
         mf = PartialUpdateMergeFunction(
             key_arity=1, value_arity=2, nullables=[True, False])
         mf.reset()
         mf.add(_kv((1,), 1, RowKind.INSERT, ('a', 'x')))
         with self.assertRaises(ValueError) as cm:
             mf.add(_kv((1,), 2, RowKind.INSERT, (None, None)))
-        self.assertIn("Field 1", str(cm.exception))
+        self.assertIn("at index 1", str(cm.exception))
+
+    def test_not_null_error_message_uses_field_name_when_given(self):
+        """When ``value_field_names`` is supplied, the NOT-NULL error
+        names the offending field so the message is directly actionable
+        instead of citing a bare positional index."""
+        mf = PartialUpdateMergeFunction(
+            key_arity=1, value_arity=2,
+            nullables=[True, False],
+            value_field_names=['a', 'b'])
+        mf.reset()
+        with self.assertRaises(ValueError) as cm:
+            mf.add(_kv((1,), 1, RowKind.INSERT, ('a', None)))
+        msg = str(cm.exception)
+        self.assertIn("'b'", msg)
+        self.assertIn("Declare the field nullable", msg)
+
+    def test_value_field_names_length_mismatch_raises(self):
+        with self.assertRaises(ValueError):
+            PartialUpdateMergeFunction(
+                key_arity=1, value_arity=2,
+                nullables=[True, True],
+                value_field_names=['only_one'])
 
     def test_null_for_nullable_field_is_absorbed(self):
         """A null input on a nullable field is silently absorbed (existing
diff --git a/paimon-python/pypaimon/tests/test_sequence_field_read.py 
b/paimon-python/pypaimon/tests/test_sequence_field_read.py
index 87d4cbe42c..ed32768c2e 100644
--- a/paimon-python/pypaimon/tests/test_sequence_field_read.py
+++ b/paimon-python/pypaimon/tests/test_sequence_field_read.py
@@ -363,15 +363,16 @@ class SequenceFieldReadE2ETest(unittest.TestCase):
 
     def test_sequence_group_still_rejected(self):
         """Top-level sequence.field is supported, but per-field
-        sequence-group is not -- the read must still raise.
+        sequence-group is not -- it must still be rejected. The shared
+        merge-engine dispatch now rejects this combination fail-fast on
+        the write path, so the write (not the read) is what raises.
         """
         table = self._create_pk_table(
             'seq_group', merge_engine='partial-update',
             extra_options={'sequence.field': 'ts',
                            'fields.ts2.sequence-group': 'val'})
-        self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
         with self.assertRaises(NotImplementedError):
-            self._read(table)
+            self._write(table, [{'id': 1, 'ts': 100, 'ts2': 0, 'val': 'x'}])
 
     def test_nested_sequence_field_rejected(self):
         """nested-sequence-field is unimplemented and must be rejected
diff --git a/paimon-python/pypaimon/tests/test_write_merge_buffer.py 
b/paimon-python/pypaimon/tests/test_write_merge_buffer.py
new file mode 100644
index 0000000000..0b86b59138
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_write_merge_buffer.py
@@ -0,0 +1,366 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Unit tests for ``KeyValueDataWriter`` buffer behaviour.
+
+Covers the fold algorithm (`_merge_pending_by_pk`), the flush lifecycle
+(`_flush_all` empties the buffer + clears pending_data), and the
+roll-write helper (`_roll_write` splits oversized buffers across
+multiple files). Drives a thin harness that bypasses
+``DataWriter.__init__`` so tests can exercise these paths without
+spinning up the real catalog/write stack.
+"""
+
+import unittest
+from unittest.mock import Mock
+
+import pyarrow as pa
+
+from pypaimon.read.reader.deduplicate_merge_function import \
+    DeduplicateMergeFunction
+from pypaimon.read.reader.partial_update_merge_function import \
+    PartialUpdateMergeFunction
+from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter
+
+
+# Layout matches what ``KeyValueDataWriter._add_system_fields`` emits:
+# ``[_KEY_id, _SEQUENCE_NUMBER, _VALUE_KIND, id, a, b]``. ``id`` is
+# duplicated on the value side because the value layout in Paimon's
+# row tuple includes every original column.
+_SCHEMA = pa.schema([
+    pa.field('_KEY_id', pa.int64(), nullable=False),
+    pa.field('_SEQUENCE_NUMBER', pa.int64(), nullable=False),
+    pa.field('_VALUE_KIND', pa.int8(), nullable=False),
+    pa.field('id', pa.int64(), nullable=False),
+    pa.field('a', pa.string()),
+    pa.field('b', pa.string()),
+])
+
+
+def _row(pk, seq, a, b):
+    return {
+        '_KEY_id': pk,
+        '_SEQUENCE_NUMBER': seq,
+        '_VALUE_KIND': 0,
+        'id': pk,
+        'a': a,
+        'b': b,
+    }
+
+
+class _Harness(KeyValueDataWriter):
+    """Bypass ``DataWriter.__init__`` to keep tests focused.
+
+    Provides just the attributes ``_merge_pending_by_pk`` / ``_flush_all``
+    / ``_roll_write`` read, plus a recording stub for
+    ``_write_data_to_file`` so the roll-write path can be exercised
+    without touching the filesystem.
+    """
+
+    def __init__(self, merge_function, target_file_size: int = 10 ** 12):
+        self.trimmed_primary_keys = ['id']
+        self._merge_function = merge_function
+        # Large enough that ``_check_and_roll_if_needed`` does not
+        # trigger on its own in tests that don't care about rolling.
+        self.target_file_size = target_file_size
+        self.pending_data = None
+        self.committed_files = []
+        self.written_chunks = []
+
+    def _write_data_to_file(self, data):
+        # Record each chunk instead of writing to disk; mirrors the
+        # base writer's contract of appending to ``committed_files``.
+        self.written_chunks.append(data)
+
+
+class WriteMergeBufferTest(unittest.TestCase):
+
+    # -- deduplicate ------------------------------------------------------
+
+    def test_dedupe_collapses_same_pk_run_to_latest(self):
+        writer = _Harness(DeduplicateMergeFunction())
+        data = pa.Table.from_pylist(
+            [_row(1, 1, 'old', None), _row(1, 2, 'new', None)],
+            schema=_SCHEMA,
+        )
+        out = writer._merge_pending_by_pk(data)
+        self.assertEqual(out.num_rows, 1)
+        self.assertEqual(
+            out.to_pylist(),
+            [_row(1, 2, 'new', None)],
+        )
+
+    def test_dedupe_keeps_disjoint_keys(self):
+        writer = _Harness(DeduplicateMergeFunction())
+        data = pa.Table.from_pylist(
+            [_row(1, 1, 'A', None),
+             _row(2, 2, 'B', None),
+             _row(3, 3, 'C', None)],
+            schema=_SCHEMA,
+        )
+        out = writer._merge_pending_by_pk(data)
+        self.assertEqual(out.num_rows, 3)
+        self.assertEqual(
+            sorted(out.to_pylist(), key=lambda r: r['id']),
+            [_row(1, 1, 'A', None),
+             _row(2, 2, 'B', None),
+             _row(3, 3, 'C', None)],
+        )
+
+    # -- partial-update ---------------------------------------------------
+
+    def _partial_update(self):
+        # Value-side carries 3 columns (id, a, b). The PK column ``id``
+        # is duplicated into the value side so partial-update can apply
+        # last-non-null semantics uniformly across every original
+        # user column.
+        return PartialUpdateMergeFunction(
+            key_arity=1, value_arity=3, nullables=[True, True, True])
+
+    def test_partial_update_merges_non_null_per_field(self):
+        writer = _Harness(self._partial_update())
+        data = pa.Table.from_pylist(
+            [_row(1, 1, 'A', None), _row(1, 2, None, 'B')],
+            schema=_SCHEMA,
+        )
+        out = writer._merge_pending_by_pk(data)
+        self.assertEqual(out.num_rows, 1)
+        self.assertEqual(out.to_pylist(), [_row(1, 2, 'A', 'B')])
+
+    def test_partial_update_three_writes_compose(self):
+        writer = _Harness(self._partial_update())
+        data = pa.Table.from_pylist(
+            [_row(1, 1, 'A', None),
+             _row(1, 2, None, 'B'),
+             _row(1, 3, None, None)],
+            schema=_SCHEMA,
+        )
+        out = writer._merge_pending_by_pk(data)
+        self.assertEqual(out.to_pylist(), [_row(1, 3, 'A', 'B')])
+
+    def test_partial_update_later_null_does_not_clobber_earlier_value(self):
+        writer = _Harness(self._partial_update())
+        data = pa.Table.from_pylist(
+            [_row(1, 1, 'KEEP', 'B'), _row(1, 2, None, None)],
+            schema=_SCHEMA,
+        )
+        out = writer._merge_pending_by_pk(data)
+        self.assertEqual(out.to_pylist(), [_row(1, 2, 'KEEP', 'B')])
+
+    # -- edge cases -------------------------------------------------------
+
+    def test_empty_buffer_returns_empty(self):
+        writer = _Harness(DeduplicateMergeFunction())
+        empty = pa.Table.from_pylist([], schema=_SCHEMA)
+        out = writer._merge_pending_by_pk(empty)
+        self.assertEqual(out.num_rows, 0)
+
+    def test_single_row_buffer_skips_merge(self):
+        # Mock to confirm the merge function isn't invoked: a single
+        # row cannot have duplicates, so we sidestep the to_pylist
+        # round-trip.
+        mock_mf = Mock()
+        writer = _Harness(mock_mf)
+        data = pa.Table.from_pylist(
+            [_row(1, 1, 'X', None)], schema=_SCHEMA)
+        out = writer._merge_pending_by_pk(data)
+        self.assertEqual(out.num_rows, 1)
+        mock_mf.reset.assert_not_called()
+        mock_mf.add.assert_not_called()
+        mock_mf.get_result.assert_not_called()
+
+    def test_get_result_none_drops_pk_run(self):
+        # Future-proof: contract says ``get_result`` returning ``None``
+        # means the entire PK group should be dropped.
+        class DropAll:
+            def reset(self):
+                pass
+
+            def add(self, _):
+                pass
+
+            def get_result(self):
+                return None
+
+        writer = _Harness(DropAll())
+        data = pa.Table.from_pylist(
+            [_row(1, 1, 'A', None), _row(1, 2, 'B', None)],
+            schema=_SCHEMA,
+        )
+        out = writer._merge_pending_by_pk(data)
+        self.assertEqual(out.num_rows, 0)
+
+    # -- KeyValue pooling -------------------------------------------------
+
+    def test_keyvalue_pool_does_not_alias_results_across_runs(self):
+        # Pooling reuses one KeyValue across the whole fold. If the
+        # PartialUpdateMergeFunction's get_result snapshotting were
+        # broken, run 1's result would mutate when run 2's data is
+        # written into the pooled instance. This test would catch that
+        # regression: build a buffer with two distinct PK runs and
+        # verify both results stand on their own.
+        writer = _Harness(self._partial_update())
+        data = pa.Table.from_pylist(
+            [_row(1, 1, 'A1', None),
+             _row(1, 2, None, 'B1'),
+             _row(2, 3, 'A2', None),
+             _row(2, 4, None, 'B2')],
+            schema=_SCHEMA,
+        )
+        out = writer._merge_pending_by_pk(data)
+        self.assertEqual(
+            sorted(out.to_pylist(), key=lambda r: r['id']),
+            [_row(1, 2, 'A1', 'B1'), _row(2, 4, 'A2', 'B2')],
+        )
+
+    # -- _process_data (no longer sorts) ----------------------------------
+
+    def test_process_data_adds_system_fields_without_sorting(self):
+        # Deferred-sort design: ``_process_data`` must not pre-sort the
+        # incoming batch. The global sort happens once inside
+        # ``_flush_all`` over the concatenated buffer.
+        writer = _Harness(DeduplicateMergeFunction())
+        writer.sequence_generator = _StubSeqGen()
+        # Intentionally pass rows in descending PK order; if _process_data
+        # were still sorting, the output would come back ascending.
+        batch = pa.RecordBatch.from_pylist(
+            [{'id': 3, 'a': 'C', 'b': None},
+             {'id': 1, 'a': 'A', 'b': None},
+             {'id': 2, 'a': 'B', 'b': None}],
+            schema=pa.schema([
+                pa.field('id', pa.int64(), nullable=False),
+                pa.field('a', pa.string()),
+                pa.field('b', pa.string()),
+            ]),
+        )
+        out = writer._process_data(batch)
+        self.assertEqual(
+            [r['id'] for r in out.to_pylist()],
+            [3, 1, 2],
+        )
+
+    # -- _flush_all -------------------------------------------------------
+
+    def test_flush_all_sorts_folds_and_writes_one_file(self):
+        # Buffer with duplicate PKs in arbitrary order. _flush_all is
+        # responsible for sorting before folding, so unsorted input is
+        # the right stress case.
+        writer = _Harness(DeduplicateMergeFunction())
+        writer.pending_data = pa.Table.from_pylist(
+            [_row(2, 5, 'B2-new', None),
+             _row(1, 2, 'A1-mid', None),
+             _row(1, 1, 'A1-old', None),
+             _row(2, 4, 'B2-old', None),
+             _row(1, 3, 'A1-new', None)],
+            schema=_SCHEMA,
+        )
+        writer._flush_all()
+
+        # Buffer cleared.
+        self.assertIsNone(writer.pending_data)
+        # Exactly one file written (size well under target).
+        self.assertEqual(len(writer.written_chunks), 1)
+        flushed = writer.written_chunks[0]
+        result = sorted(flushed.to_pylist(), key=lambda r: r['id'])
+        # Dedup -> 1 row per PK, with the highest seq value retained.
+        self.assertEqual(result, [
+            _row(1, 3, 'A1-new', None),
+            _row(2, 5, 'B2-new', None),
+        ])
+
+    def test_flush_all_on_empty_buffer_is_noop(self):
+        writer = _Harness(DeduplicateMergeFunction())
+        writer.pending_data = None
+        writer._flush_all()
+        self.assertIsNone(writer.pending_data)
+        self.assertEqual(writer.written_chunks, [])
+
+    def test_flush_all_clears_buffer_even_when_fold_drops_everything(self):
+        # MergeFunction that returns None for every group; verifies
+        # ``_flush_all`` still resets ``pending_data`` so a subsequent
+        # write starts from a clean slate.
+        class DropAll:
+            def reset(self):
+                pass
+
+            def add(self, _):
+                pass
+
+            def get_result(self):
+                return None
+
+        writer = _Harness(DropAll())
+        writer.pending_data = pa.Table.from_pylist(
+            [_row(1, 1, 'A', None), _row(1, 2, 'B', None)],
+            schema=_SCHEMA,
+        )
+        writer._flush_all()
+        self.assertIsNone(writer.pending_data)
+        self.assertEqual(writer.written_chunks, [])
+
+    # -- _roll_write ------------------------------------------------------
+
+    def test_roll_write_single_chunk_when_under_target(self):
+        writer = _Harness(DeduplicateMergeFunction(),
+                          target_file_size=10 ** 9)
+        data = pa.Table.from_pylist(
+            [_row(1, 1, 'A', None), _row(2, 2, 'B', None)],
+            schema=_SCHEMA,
+        )
+        writer._roll_write(data)
+        self.assertEqual(len(writer.written_chunks), 1)
+        self.assertEqual(writer.written_chunks[0].num_rows, 2)
+
+    def test_roll_write_splits_oversized_buffer_into_multiple_files(self):
+        # Build a buffer whose nbytes comfortably exceeds the chosen
+        # target. With a small target_file_size the writer should hand
+        # back at least two files. Use long strings so nbytes scales
+        # predictably with row count.
+        rows = [
+            _row(i, i, 'x' * 64, 'y' * 64) for i in range(1, 401)
+        ]
+        data = pa.Table.from_pylist(rows, schema=_SCHEMA)
+        # Target small enough that 400 rows will not fit in one file.
+        target = data.nbytes // 4
+        writer = _Harness(DeduplicateMergeFunction(),
+                          target_file_size=target)
+        writer._roll_write(data)
+
+        self.assertGreaterEqual(len(writer.written_chunks), 2)
+        total_rows = sum(c.num_rows for c in writer.written_chunks)
+        self.assertEqual(total_rows, data.num_rows)
+        # Each chunk except possibly the last should respect the target.
+        for chunk in writer.written_chunks[:-1]:
+            self.assertLessEqual(chunk.nbytes, target)
+
+
+class _StubSeqGen:
+    """Stand-in for ``SequenceGenerator`` so the harness can call
+    ``_process_data`` without going through the real ``DataWriter.__init__``.
+    """
+
+    def __init__(self):
+        self._n = 0
+
+    def next(self) -> int:
+        self._n += 1
+        return self._n
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/write/file_store_write.py 
b/paimon-python/pypaimon/write/file_store_write.py
index f1374797e1..c77f88e907 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -15,11 +15,15 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import logging
 import random
 from typing import Dict, List, Tuple
 
 import pyarrow as pa
 
+
+logger = logging.getLogger(__name__)
+
 from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.write.commit_message import CommitMessage
 from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
@@ -90,7 +94,8 @@ class FileStoreWrite:
                 partition=partition,
                 bucket=bucket,
                 max_seq_number=max_seq_number(),
-                options=options)
+                options=options,
+                merge_function=self._build_pk_merge_function())
         else:
             seq_number = 0 if self.table.bucket_mode() == 
BucketMode.BUCKET_UNAWARE else max_seq_number()
             return AppendOnlyDataWriter(
@@ -102,6 +107,111 @@ class FileStoreWrite:
                 write_cols=self.write_cols
             )
 
+    def _build_pk_merge_function(self):
+        """Build the merge function for the in-memory write buffer.
+
+        Shares ``merge_engine_dispatch.build_merge_function`` with the
+        read path so the supported engines (deduplicate, first-row,
+        partial-update with no out-of-scope options) cannot drift
+        between sides.
+
+        For wholly unsupported engines (``aggregation``) the writer
+        falls back to ``DeduplicateMergeFunction`` so the flushed file
+        still maintains the LSM "PK unique within a file" invariant.
+        The read path's dispatch still raises ``NotImplementedError``,
+        so the user gets an explicit error before they observe
+        wrong-engine data; the fallback only narrows the damage to
+        "file is deduped, not aggregated" rather than the silent
+        multi-row-per-PK corruption that existed pre-PR.
+
+        Partial-update with out-of-scope options (sequence-group,
+        per-field aggregator, ignore-delete, remove-record-on-*) does
+        **not** fall back: ``partial_update_unsupported_options`` sees
+        the configured keys and re-raises, so the first
+        ``write_arrow`` call (where ``_create_data_writer`` first runs)
+        surfaces the error. Silently degrading to dedupe there is the
+        same live corruption pattern this PR exists to close.
+
+        ``with_write_type`` (column-subset writes) on a PK table is
+        also rejected here. The buffer layout
+        ``_add_system_fields`` produces would carry only the subset
+        on the value side, while a ``MergeFunction`` such as
+        ``PartialUpdateMergeFunction`` is built against the full table
+        arity -- the two sides would mismatch on
+        ``KeyValue.value.get_field`` and raise ``IndexError`` at
+        flush time. Refusing it explicitly avoids that obscure failure
+        and keeps the supported surface narrow.
+
+        The value-side schema must match the layout
+        ``KeyValueDataWriter`` flushes -- ``_add_system_fields`` keeps
+        every original user column on the value side (the primary keys
+        are duplicated as ``_KEY_<pk>`` columns to the left of the
+        value side). So ``value_arity`` here is ``len(table.fields)``,
+        not ``len(table.fields) - len(primary_keys)``.
+        """
+        from pypaimon.common.merge_engine_dispatch import (
+            build_merge_function, partial_update_unsupported_options)
+        from pypaimon.common.options.core_options import MergeEngine
+        from pypaimon.read.reader.deduplicate_merge_function import \
+            DeduplicateMergeFunction
+
+        engine = self.options.merge_engine()
+        raw_options = self.options.options.to_map()
+
+        if self.write_cols is not None:
+            raise NotImplementedError(
+                "with_write_type is not yet supported on primary-key "
+                "tables: the writer-side merge buffer assumes the "
+                "input batch carries the full table schema. Drop the "
+                "with_write_type call or write the missing columns as "
+                "nulls in the input batch."
+            )
+
+        # PARTIAL_UPDATE + out-of-scope option: never silently fall
+        # back -- forward the read-side error verbatim so writes fail
+        # before the first flush rather than corrupt the file.
+        if engine == MergeEngine.PARTIAL_UPDATE \
+                and partial_update_unsupported_options(raw_options):
+            return build_merge_function(
+                engine=engine, raw_options=raw_options,
+                key_arity=len(self.table.trimmed_primary_keys),
+                value_arity=len(self.table.table_schema.fields),
+                value_field_nullables=[
+                    f.type.nullable for f in self.table.table_schema.fields],
+                value_field_names=[
+                    f.name for f in self.table.table_schema.fields],
+            )
+
+        # Catch the dispatch's "wholly unsupported engine" raise only
+        # for the engines we know are out of scope today; any other
+        # NotImplementedError is a bug we want to surface, not swallow.
+        if engine == MergeEngine.AGGREGATE:
+            # Surface the silent semantic mismatch in logs: the file
+            # will be PK-unique (better than the pre-PR multi-row
+            # corruption), but any reader that honours the declared
+            # engine will see wrong values. Users sharing tables
+            # across writers especially need to see this.
+            logger.warning(
+                "merge-engine '%s' is not implemented on the pypaimon "
+                "write path; falling back to deduplicate so the flushed "
+                "file stays PK-unique. The file contents reflect "
+                "deduplicate semantics (latest writer wins), not %s "
+                "semantics. Any reader that interprets the file under "
+                "the declared engine will return incorrect results. "
+                "Avoid the pypaimon writer for tables on this engine.",
+                engine.value, engine.value)
+            return DeduplicateMergeFunction()
+
+        all_value_fields = self.table.table_schema.fields
+        return build_merge_function(
+            engine=engine, raw_options=raw_options,
+            key_arity=len(self.table.trimmed_primary_keys),
+            value_arity=len(all_value_fields),
+            value_field_nullables=[
+                f.type.nullable for f in all_value_fields],
+            value_field_names=[f.name for f in all_value_fields],
+        )
+
     def _has_blob_columns(self) -> bool:
         """Check if the table schema contains blob columns."""
         for field in self.table.table_schema.fields:
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py 
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
index 6c6f292f57..64f6003a06 100644
--- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -15,22 +15,227 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from typing import List, Union
+
 import pyarrow as pa
 import pyarrow.compute as pc
 
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.read.reader.deduplicate_merge_function import \
+    DeduplicateMergeFunction
+from pypaimon.table.row.key_value import KeyValue
 from pypaimon.write.writer.data_writer import DataWriter
 
 
 class KeyValueDataWriter(DataWriter):
-    """Data writer for primary key tables with system fields and sorting."""
+    """Data writer for primary key tables with system fields and sorting.
+
+    Accumulates incoming batches in ``pending_data`` without sorting or
+    folding on the write path. Sort and ``MergeFunction``-based fold
+    are deferred to flush time (``_flush_all``), where the result is
+    roll-written into one or more data files. This enforces the LSM
+    "PK unique within a file" invariant the read-side
+    ``raw_convertible`` fast path relies on, while keeping per-write
+    cost bounded.
+    """
+
+    def __init__(self, table, partition, bucket, max_seq_number,
+                 options=None, write_cols=None, merge_function=None):
+        super().__init__(table, partition, bucket, max_seq_number,
+                         options, write_cols)
+        # Defaults to deduplicate so direct callers (tests / future code
+        # paths that don't go through FileStoreWrite) don't accidentally
+        # skip the merge step entirely.
+        self._merge_function = merge_function or DeduplicateMergeFunction()
 
     def _process_data(self, data: pa.RecordBatch) -> pa.Table:
+        # No sort here: sorting once at flush is strictly cheaper than
+        # per-batch sort + a final global sort. ``pending_data`` ends
+        # up as a concat of unsorted batches; ``_flush_all`` sorts it
+        # exactly once before folding.
         enhanced_data = self._add_system_fields(data)
-        return 
pa.Table.from_batches([self._sort_by_primary_key(enhanced_data)])
+        return pa.Table.from_batches([enhanced_data])
 
     def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> 
pa.Table:
-        combined = pa.concat_tables([existing_data, new_data])
-        return self._sort_by_primary_key(combined)
+        # Plain concat. Sort + fold both run inside ``_flush_all`` so
+        # N writes incur 1 sort instead of N sorts.
+        return pa.concat_tables([existing_data, new_data])
+
+    def prepare_commit(self) -> List[DataFileMeta]:
+        if self.pending_data is not None and self.pending_data.num_rows > 0:
+            self._flush_all()
+        # ``_flush_all`` leaves ``pending_data = None``, so super's
+        # prepare_commit just returns ``committed_files``.
+        return super().prepare_commit()
+
+    def _check_and_roll_if_needed(self):
+        # Buffer overflowed target_file_size: sort + fold + roll-write
+        # the whole buffer as multiple files in one pass. Unlike the
+        # base class's slice loop, we never keep a slice remainder in
+        # ``pending_data`` -- flush empties the buffer outright.
+        if (self.pending_data is not None
+                and self.pending_data.num_rows > 0
+                and self.pending_data.nbytes > self.target_file_size):
+            self._flush_all()
+
+    def close(self):
+        # Override the base ``close`` because its straight
+        # ``_write_data_to_file(pending_data)`` would land an unsorted,
+        # un-folded buffer on disk -- violating the file-internal
+        # PK-unique invariant. Route the final flush through
+        # ``_flush_all`` so the contract holds even on the
+        # close-without-prepare_commit path.
+        try:
+            if self.pending_data is not None and self.pending_data.num_rows > 
0:
+                self._flush_all()
+        except Exception as e:
+            import logging
+            logger = logging.getLogger(__name__)
+            logger.warning(
+                "Exception occurs when closing writer. Cleaning up.",
+                exc_info=e)
+            self.abort()
+            raise e
+        finally:
+            self.pending_data = None
+
+    def _flush_all(self) -> None:
+        """Sort + fold the entire buffer, then roll-write as files.
+
+        On return, ``pending_data is None`` and every flushed chunk
+        has been recorded in ``committed_files``. The buffer is
+        always fully drained per flush: no slice remainder is
+        carried back into ``pending_data``.
+        """
+        if self.pending_data is None or self.pending_data.num_rows == 0:
+            self.pending_data = None
+            return
+        sorted_data = self._sort_by_primary_key(self.pending_data)
+        folded = self._merge_pending_by_pk(sorted_data)
+        self.pending_data = None
+        if folded.num_rows == 0:
+            return
+        self._roll_write(folded)
+
+    def _roll_write(self, data: pa.Table) -> None:
+        """Write ``data`` as one or more files, each <= target_file_size.
+
+        ``data`` is required to be PK-unique (the fold guarantees
+        that), so any slice of it is also PK-unique -- splitting for
+        size does not violate the LSM file-internal invariant.
+        Reuses ``_find_optimal_split_point`` / ``_write_data_to_file``
+        from the base class.
+        """
+        while data.num_rows > 0:
+            if data.nbytes <= self.target_file_size:
+                self._write_data_to_file(data)
+                return
+            split_row = self._find_optimal_split_point(
+                data, self.target_file_size)
+            if split_row <= 0:
+                # Single row already exceeds target_file_size; nothing
+                # to gain from further slicing, write it as-is.
+                self._write_data_to_file(data)
+                return
+            self._write_data_to_file(data.slice(0, split_row))
+            data = data.slice(split_row)
+
+    def _merge_pending_by_pk(self, data: pa.Table) -> pa.Table:
+        """Fold same-PK runs in ``data`` using ``self._merge_function``.
+
+        ``data`` is required to already be sorted by
+        ``(primary_key, _SEQUENCE_NUMBER)``. ``_flush_all`` is the
+        only caller and runs ``_sort_by_primary_key`` immediately
+        before this method, so the precondition holds.
+
+        NOTE(follow-up): the merge runs row-by-row over
+        ``data.to_pydict()`` / ``pa.Table.from_pydict``. Arrow types
+        with non-trivial Python representations (Decimal128 with
+        specific precision/scale, timestamps with timezone or
+        sub-millisecond units, durations, deeply nested structs) can
+        drift across this round-trip. A columnar merge implementation
+        would close the gap and is tracked as a follow-up; until
+        then, partial-update on those types should be avoided in
+        pypaimon.
+        """
+        n = data.num_rows
+        if n < 2:
+            # Single-row buffer cannot have duplicates; sidestep the
+            # row-by-row pyarrow round-trip in the common streaming case.
+            return data
+
+        col_names = data.schema.names
+        # ``to_pydict`` works on pyarrow >= 6 (Python 3.6 CI ships 6.0.1),
+        # unlike ``to_pylist`` which only landed in pyarrow 7.
+        col_dict = data.to_pydict()
+        rows = [{name: col_dict[name][i] for name in col_names}
+                for i in range(n)]
+        key_arity = len(self.trimmed_primary_keys)
+        # System fields sit at indices [key_arity, key_arity + 1] (the
+        # _SEQUENCE_NUMBER and _VALUE_KIND columns added by
+        # _add_system_fields). Everything to the right is the value side.
+        value_arity = len(col_names) - key_arity - 2
+
+        # Pool one ``KeyValue`` for the whole fold. Safe because:
+        # - ``DeduplicateMergeFunction.add`` stores the kv reference; the
+        #   reused instance always carries the most recent ``replace``,
+        #   which is exactly the "latest wins" the engine wants.
+        # - ``PartialUpdateMergeFunction.add`` also stores a reference,
+        #   but ``get_result`` snapshots key + sequence into a fresh
+        #   tuple before returning, so the consumed result is decoupled
+        #   from any later ``replace`` on the pooled kv.
+        # - ``FirstRowMergeFunction.add`` ``copy()``s the first kv, so it
+        #   keeps the first row rather than tracking later ``replace``s on
+        #   the pooled kv (which would otherwise yield the last row).
+        # This drops per-row ``KeyValue``/``OffsetRow`` allocations and
+        # the resulting GC churn on large buffers.
+        pooled_kv = KeyValue(key_arity, value_arity)
+
+        merged_rows: List[dict] = []
+        i = 0
+        while i < n:
+            j = i
+            first_key = self._key_tuple(rows[i], col_names, key_arity)
+            while j < n and \
+                    self._key_tuple(rows[j], col_names, key_arity) == 
first_key:
+                j += 1
+            run = rows[i:j]
+            self._merge_function.reset()
+            for r in run:
+                pooled_kv.replace(self._row_to_tuple(r, col_names))
+                self._merge_function.add(pooled_kv)
+            result_kv = self._merge_function.get_result()
+            if result_kv is not None:
+                merged_rows.append(
+                    self._kv_to_row(result_kv, col_names,
+                                    key_arity, value_arity))
+            i = j
+
+        if not merged_rows:
+            return data.slice(0, 0)
+        result_dict = {name: [r[name] for r in merged_rows]
+                       for name in data.schema.names}
+        return pa.Table.from_pydict(result_dict, schema=data.schema)
+
+    @staticmethod
+    def _key_tuple(row: dict, col_names: List[str], key_arity: int) -> tuple:
+        return tuple(row[col_names[i]] for i in range(key_arity))
+
+    @staticmethod
+    def _row_to_tuple(row: dict, col_names: List[str]) -> tuple:
+        return tuple(row[name] for name in col_names)
+
+    @staticmethod
+    def _kv_to_row(kv: KeyValue, col_names: List[str],
+                   key_arity: int, value_arity: int) -> dict:
+        out = {}
+        for i in range(key_arity):
+            out[col_names[i]] = kv.key.get_field(i)
+        out[col_names[key_arity]] = kv.sequence_number
+        out[col_names[key_arity + 1]] = kv.value_row_kind_byte
+        for i in range(value_arity):
+            out[col_names[key_arity + 2 + i]] = kv.value.get_field(i)
+        return out
 
     def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch:
         """Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND."""
@@ -61,11 +266,15 @@ class KeyValueDataWriter(DataWriter):
 
         return pa.RecordBatch.from_arrays(new_arrays, 
schema=pa.schema(new_fields))
 
-    def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch:
+    def _sort_by_primary_key(
+        self, data: Union[pa.RecordBatch, pa.Table]
+    ) -> Union[pa.RecordBatch, pa.Table]:
+        # pc.sort_indices + .take work uniformly over RecordBatch and
+        # Table, so this serves both the per-batch entry path (legacy)
+        # and the buffer-wide sort path (used by ``_flush_all``).
         sort_keys = [(key, 'ascending') for key in self.trimmed_primary_keys]
         if '_SEQUENCE_NUMBER' in data.schema.names:
             sort_keys.append(('_SEQUENCE_NUMBER', 'ascending'))
 
         sorted_indices = pc.sort_indices(data, sort_keys=sort_keys)
-        sorted_batch = data.take(sorted_indices)
-        return sorted_batch
+        return data.take(sorted_indices)

Reply via email to