This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ad391f4fc [python] Implement aggregation merge engine in pypaimon 
(#7952)
8ad391f4fc is described below

commit 8ad391f4fce57569afb6fe7eabae90085d0de99e
Author: chaoyang <[email protected]>
AuthorDate: Mon Jun 1 19:32:17 2026 +0800

    [python] Implement aggregation merge engine in pypaimon (#7952)
    
    Port Java's `AggregateMergeFunction` to pypaimon, following the shape of
    #7745.
    
    Ships the `FieldAggregator` framework, 9 value aggregators (`sum` /
    `max` / `min` / `last_value` / `last_non_null_value` / `first_value` /
    `first_non_null_value` / `bool_or` / `bool_and`) plus the `primary_key`
    placeholder, the `AggregateMergeFunction` wired into
    `MergeFileSplitRead._build_merge_function`, and a `merge_engine_support`
    guard that rejects retract opt-ins, sequence fields and out-of-scope
    aggregator identifiers (`collect` / `nested_update` / `theta_sketch` /
    `roaring_bitmap_*` / ...).
    
    Retract handling (DELETE / UPDATE_BEFORE) and the remaining 14 Java
    aggregators are intentionally deferred to follow-up PRs, mirroring
    #7745's scoping.
---
 .../pypaimon/read/merge_engine_support.py          |  91 +++++-
 .../pypaimon/read/reader/aggregate/__init__.py     |  84 +++++
 .../pypaimon/read/reader/aggregate/aggregators.py  | 283 +++++++++++++++++
 .../read/reader/aggregate/field_aggregator.py      |  81 +++++
 .../read/reader/aggregation_merge_function.py      | 194 ++++++++++++
 paimon-python/pypaimon/read/split_read.py          |  19 ++
 .../pypaimon/tests/test_aggregation_e2e.py         | 350 +++++++++++++++++++++
 .../tests/test_aggregation_merge_function.py       | 300 ++++++++++++++++++
 .../tests/test_field_aggregator_registry.py        | 103 ++++++
 .../pypaimon/tests/test_field_aggregators.py       | 274 ++++++++++++++++
 .../pypaimon/tests/test_partial_update_e2e.py      |  23 +-
 11 files changed, 1779 insertions(+), 23 deletions(-)

diff --git a/paimon-python/pypaimon/read/merge_engine_support.py 
b/paimon-python/pypaimon/read/merge_engine_support.py
index 2dde400465..8ab4813f7e 100644
--- a/paimon-python/pypaimon/read/merge_engine_support.py
+++ b/paimon-python/pypaimon/read/merge_engine_support.py
@@ -45,9 +45,29 @@ _PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS = (
     "partial-update.remove-record-on-delete",
     "partial-update.remove-record-on-sequence-group",
 )
+# Boolean-valued options that, when truthy, opt the table into the
+# retract / delete-removal behaviour the Python
+# ``AggregateMergeFunction`` does not implement.
+_AGGREGATION_UNSUPPORTED_BOOLEAN_OPTIONS = (
+    "aggregation.remove-record-on-delete",
+)
+# Aggregator identifiers the ``aggregation`` engine knows how to
+# build. Duplicated from the registration site in
+# ``aggregate/aggregators.py`` so this guard has no import-time
+# dependency on the read-pipeline modules; keep both sides in sync
+# when adding new aggregators.
+_AGGREGATION_SUPPORTED_AGG_FUNCS = frozenset([
+    "primary_key",
+    "last_value", "last_non_null_value",
+    "first_value", "first_non_null_value",
+    "sum", "max", "min",
+    "bool_or", "bool_and",
+])
+_SEQUENCE_FIELD_KEY = "sequence.field"
 _FIELDS_PREFIX = "fields."
 _FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group"
 _FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function"
+_FIELD_IGNORE_RETRACT_SUFFIX = ".ignore-retract"
 _DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function"
 
 
@@ -73,16 +93,35 @@ def check_supported(table) -> None:
                 "supported subset is per-key last-non-null merge with "
                 "no sequence-group, no per-field aggregator override, "
                 "no ignore-delete and no partial-update.remove-record-"
-                "on-* flags. Use the Java client for the full feature "
-                "set, or open an issue to track Python support.".format(
+                "on-* flags. These options are not yet supported; open "
+                "an issue to track support.".format(
                     ", ".join(sorted(unsupported))
                 )
             )
         return
+    if engine == MergeEngine.AGGREGATE:
+        unsupported = aggregation_unsupported_options(table)
+        if unsupported:
+            raise NotImplementedError(
+                "merge-engine 'aggregation' is enabled together with "
+                "options that pypaimon does not yet implement: {}. The "
+                "supported subset is per-key field aggregation with the "
+                "built-in aggregators ({}); retract opt-ins "
+                "(aggregation.remove-record-on-delete, "
+                "fields.<f>.ignore-retract), sequence-field handling "
+                "and other aggregators (product / listagg / collect / "
+                "merge_map* / nested_update* / theta_sketch / "
+                "hll_sketch / roaring_bitmap_*) are not yet supported. "
+                "Open an issue to track support.".format(
+                    ", ".join(sorted(unsupported)),
+                    ", ".join(sorted(_AGGREGATION_SUPPORTED_AGG_FUNCS)),
+                )
+            )
+        return
     raise NotImplementedError(
         "merge-engine '{}' is not implemented in pypaimon yet "
-        "(supported: deduplicate, first-row, partial-update). Use the Java "
-        "client or open an issue to track support.".format(engine.value)
+        "(supported: deduplicate, first-row, partial-update, aggregation). "
+        "Open an issue to track support.".format(engine.value)
     )
 
 
@@ -106,6 +145,50 @@ def partial_update_unsupported_options(table) -> Set[str]:
     return flagged
 
 
+def aggregation_unsupported_options(table) -> Set[str]:
+    """Return the set of option keys configured on this table that the
+    ``AggregateMergeFunction`` does not yet support. Empty set means
+    the configuration is safe to run.
+
+    Three families of options are rejected:
+
+    1. Retract opt-ins: ``aggregation.remove-record-on-delete`` and
+       ``fields.<f>.ignore-retract`` only make sense in conjunction
+       with DELETE / UPDATE_BEFORE handling, which the engine does not
+       implement.
+    2. Sequence-field configuration: ``sequence.field`` /
+       ``fields.<f>.sequence-group`` are not supported; the merge
+       function does not special-case sequence fields, so we refuse
+       the table rather than silently merge them as ordinary value
+       columns.
+    3. Out-of-scope aggregator selections: ``fields.<f>.aggregate-
+       function`` and ``fields.default-aggregate-function`` set to an
+       identifier this engine doesn't support yet (e.g. ``collect``,
+       ``nested_update``).
+    """
+    flagged: Set[str] = set()
+    raw = table.options.options.to_map()
+    for key, value in raw.items():
+        if (key in _AGGREGATION_UNSUPPORTED_BOOLEAN_OPTIONS
+                and _option_is_truthy(value)):
+            flagged.add(key)
+        elif key == _SEQUENCE_FIELD_KEY and value:
+            flagged.add(key)
+        elif key == _DEFAULT_AGGREGATE_FUNCTION_KEY:
+            if value not in _AGGREGATION_SUPPORTED_AGG_FUNCS:
+                flagged.add(key)
+        elif key.startswith(_FIELDS_PREFIX):
+            if key.endswith(_FIELD_IGNORE_RETRACT_SUFFIX):
+                if _option_is_truthy(value):
+                    flagged.add(key)
+            elif key.endswith(_FIELD_SEQUENCE_GROUP_SUFFIX):
+                flagged.add(key)
+            elif key.endswith(_FIELD_AGGREGATE_FUNCTION_SUFFIX):
+                if value not in _AGGREGATION_SUPPORTED_AGG_FUNCS:
+                    flagged.add(key)
+    return flagged
+
+
 def _option_is_truthy(raw) -> bool:
     if raw is None:
         return False
diff --git a/paimon-python/pypaimon/read/reader/aggregate/__init__.py 
b/paimon-python/pypaimon/read/reader/aggregate/__init__.py
new file mode 100644
index 0000000000..8d5fe75d48
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/aggregate/__init__.py
@@ -0,0 +1,84 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""FieldAggregator registry and factory entry point.
+
+Looks up the registered factory for an aggregator identifier (``"sum"``,
+``"last_value"``, ...) read from table options and builds an instance
+for it. Concrete aggregators register themselves at import time via
+:func:`register_aggregator`; importing this package eagerly imports the
+built-in aggregator module so the registrations always happen,
+regardless of which call site triggers the first lookup.
+"""
+
+from typing import Callable, Dict, TYPE_CHECKING
+
+from pypaimon.read.reader.aggregate.field_aggregator import FieldAggregator
+from pypaimon.schema.data_types import DataType
+
+if TYPE_CHECKING:
+    from pypaimon.common.options.core_options import CoreOptions
+
+
+# Module-global registry keyed by aggregator identifier
+# (``"sum"``, ``"last_value"`` ...).
+_FACTORIES: Dict[str, Callable[[DataType, str, "CoreOptions"], 
FieldAggregator]] = {}
+
+
+def register_aggregator(
+    identifier: str,
+    factory: Callable[[DataType, str, "CoreOptions"], FieldAggregator],
+) -> None:
+    """Register ``factory`` under ``identifier``.
+
+    Re-registering an identifier replaces the existing factory. The
+    built-in aggregators register themselves at module-import time from
+    :mod:`aggregators`.
+    """
+    _FACTORIES[identifier] = factory
+
+
+def create_field_aggregator(
+    field_type: DataType,
+    field_name: str,
+    agg_func_name: str,
+    options: "CoreOptions",
+) -> FieldAggregator:
+    """Build a ``FieldAggregator`` for ``agg_func_name``.
+
+    Raises ``ValueError`` if the identifier was never registered, so
+    typos or out-of-scope aggregators surface at merge-function
+    construction time rather than at the first row.
+    """
+    factory = _FACTORIES.get(agg_func_name)
+    if factory is None:
+        raise ValueError(
+            "Use unsupported aggregation '{}' or spell aggregate function "
+            "incorrectly! Supported aggregators in pypaimon: {}".format(
+                agg_func_name, sorted(_FACTORIES.keys())
+            )
+        )
+    return factory(field_type, field_name, options)
+
+
+# Eager-import the built-in aggregator module so its top-level
+# ``register_aggregator(...)`` calls populate ``_FACTORIES`` before any
+# caller looks anything up. Placed at the bottom of the module so the
+# names ``register_aggregator`` / ``FieldAggregator`` aggregators
+# imports back from here are already defined when its import runs.
+from pypaimon.read.reader.aggregate import aggregators  # noqa: E402, F401
diff --git a/paimon-python/pypaimon/read/reader/aggregate/aggregators.py 
b/paimon-python/pypaimon/read/reader/aggregate/aggregators.py
new file mode 100644
index 0000000000..961cd10091
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/aggregate/aggregators.py
@@ -0,0 +1,283 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Built-in :class:`FieldAggregator` implementations.
+
+Each class registers itself with the global registry at import time
+via :func:`register_aggregator`, so importing
+``pypaimon.read.reader.aggregate`` makes all of them discoverable.
+
+This module ships 10 aggregators — the primary-key placeholder plus
+the 9 most commonly-used value aggregators: ``primary_key`` /
+``last_value`` / ``last_non_null_value`` / ``first_value`` /
+``first_non_null_value`` / ``sum`` / ``max`` / ``min`` / ``bool_or``
+/ ``bool_and``. Other aggregators (``product`` / ``listagg`` /
+``collect`` / ``merge_map`` / ``nested_update`` / ``theta_sketch`` /
+``hll_sketch`` / ``roaring_bitmap_*``) are intentionally deferred —
+the registry will report them as unsupported so users see a clear
+error rather than a silent fallback.
+"""
+
+from typing import Any
+
+from pypaimon.read.reader.aggregate import register_aggregator
+from pypaimon.read.reader.aggregate.field_aggregator import FieldAggregator
+from pypaimon.schema.data_types import AtomicType, DataType
+
+
+# Aggregator identifiers exposed via ``fields.<name>.aggregate-function``
+# and ``fields.default-aggregate-function``.
+NAME_PRIMARY_KEY = "primary_key"
+NAME_LAST_VALUE = "last_value"
+NAME_LAST_NON_NULL_VALUE = "last_non_null_value"
+NAME_FIRST_VALUE = "first_value"
+NAME_FIRST_NON_NULL_VALUE = "first_non_null_value"
+NAME_SUM = "sum"
+NAME_MAX = "max"
+NAME_MIN = "min"
+NAME_BOOL_OR = "bool_or"
+NAME_BOOL_AND = "bool_and"
+
+
+# Base SQL type names treated as numeric for sum/product-style
+# aggregators. NUMERIC / DEC are SQL synonyms accepted by the parser;
+# treat them the same as DECIMAL.
+_NUMERIC_BASE_TYPES = frozenset([
+    "TINYINT", "SMALLINT", "INT", "INTEGER", "BIGINT",
+    "FLOAT", "DOUBLE", "DECIMAL", "NUMERIC", "DEC",
+])
+
+
+def _atomic_base_name(field_type: DataType):
+    """Extract the bare SQL type name from an :class:`AtomicType`,
+    stripping precision arguments (``DECIMAL(10,2)``) and trailing
+    ``NOT NULL``. Returns ``None`` for non-atomic types so callers can
+    raise a uniform "unsupported type" error.
+    """
+    if not isinstance(field_type, AtomicType):
+        return None
+    raw = field_type.type
+    head = raw.split('(', 1)[0].split(' ', 1)[0]
+    return head.upper()
+
+
+def _check_numeric(name: str, field_type: DataType) -> None:
+    base = _atomic_base_name(field_type)
+    if base not in _NUMERIC_BASE_TYPES:
+        raise ValueError(
+            "Data type for '{}' column must be a numeric type but was "
+            "'{}'.".format(name, field_type)
+        )
+
+
+def _check_boolean(name: str, field_type: DataType) -> None:
+    base = _atomic_base_name(field_type)
+    if base != "BOOLEAN":
+        raise ValueError(
+            "Data type for '{}' column must be 'BOOLEAN' but was "
+            "'{}'.".format(name, field_type)
+        )
+
+
+# ---------------------------------------------------------------------------
+# Aggregator classes
+# ---------------------------------------------------------------------------
+
+
+class FieldPrimaryKeyAgg(FieldAggregator):
+    """Carries the primary-key column through merge unchanged."""
+
+    def agg(self, accumulator: Any, input_field: Any) -> Any:
+        return input_field
+
+
+class FieldLastValueAgg(FieldAggregator):
+    """Latest value wins, including ``None``."""
+
+    def agg(self, accumulator: Any, input_field: Any) -> Any:
+        return input_field
+
+
+class FieldLastNonNullValueAgg(FieldAggregator):
+    """Latest non-null value; ``None`` inputs are absorbed.
+
+    This is the system-wide default aggregator when no per-field
+    override and no ``fields.default-aggregate-function`` are set.
+    """
+
+    def agg(self, accumulator: Any, input_field: Any) -> Any:
+        return accumulator if input_field is None else input_field
+
+
+class FieldFirstValueAgg(FieldAggregator):
+    """First value (including ``None``) wins; locks after the first
+    :meth:`agg` call until the next :meth:`reset`.
+    """
+
+    def __init__(self, name: str, field_type: DataType):
+        super().__init__(name, field_type)
+        self._initialized = False
+
+    def agg(self, accumulator: Any, input_field: Any) -> Any:
+        if not self._initialized:
+            self._initialized = True
+            return input_field
+        return accumulator
+
+    def reset(self) -> None:
+        self._initialized = False
+
+
+class FieldFirstNonNullValueAgg(FieldAggregator):
+    """First non-null value; locks after the first non-null
+    :meth:`agg` call until the next :meth:`reset`.
+    """
+
+    def __init__(self, name: str, field_type: DataType):
+        super().__init__(name, field_type)
+        self._initialized = False
+
+    def agg(self, accumulator: Any, input_field: Any) -> Any:
+        if not self._initialized and input_field is not None:
+            self._initialized = True
+            return input_field
+        return accumulator
+
+    def reset(self) -> None:
+        self._initialized = False
+
+
+class FieldSumAgg(FieldAggregator):
+    """Numeric sum. ``None`` on either side returns the non-null
+    operand. Python's native ``+`` works uniformly for int / float /
+    Decimal — the values produced by the pyarrow read path already
+    arrive as the right Python primitive for the column's SQL type, so
+    no per-type branching is needed.
+    """
+
+    def agg(self, accumulator: Any, input_field: Any) -> Any:
+        if accumulator is None or input_field is None:
+            return accumulator if input_field is None else input_field
+        return accumulator + input_field
+
+
+class FieldMaxAgg(FieldAggregator):
+    """Maximum value. ``None`` on either side returns the non-null
+    operand. Uses Python's native ``<`` so any orderable type
+    (numeric, string, date, datetime, Decimal) works.
+    """
+
+    def agg(self, accumulator: Any, input_field: Any) -> Any:
+        if accumulator is None or input_field is None:
+            return accumulator if input_field is None else input_field
+        return input_field if accumulator < input_field else accumulator
+
+
+class FieldMinAgg(FieldAggregator):
+    """Minimum value. ``None`` on either side returns the non-null
+    operand. Uses Python's native ``<``.
+    """
+
+    def agg(self, accumulator: Any, input_field: Any) -> Any:
+        if accumulator is None or input_field is None:
+            return accumulator if input_field is None else input_field
+        return accumulator if accumulator < input_field else input_field
+
+
+class FieldBoolOrAgg(FieldAggregator):
+    """Logical OR. ``None`` on either side returns the non-null
+    operand.
+    """
+
+    def agg(self, accumulator: Any, input_field: Any) -> Any:
+        if accumulator is None or input_field is None:
+            return accumulator if input_field is None else input_field
+        return bool(accumulator) or bool(input_field)
+
+
+class FieldBoolAndAgg(FieldAggregator):
+    """Logical AND. ``None`` on either side returns the non-null
+    operand.
+    """
+
+    def agg(self, accumulator: Any, input_field: Any) -> Any:
+        if accumulator is None or input_field is None:
+            return accumulator if input_field is None else input_field
+        return bool(accumulator) and bool(input_field)
+
+
+# ---------------------------------------------------------------------------
+# Registration. Each builder binds an identifier to a factory that
+# optionally validates the column DataType before constructing the
+# aggregator instance.
+# ---------------------------------------------------------------------------
+
+
+def _build_no_type_check(cls, identifier: str):
+    """Build a factory that accepts any DataType. Used by
+    ``primary_key`` / ``last_value`` / ``first_value`` variants and by
+    ``max`` / ``min``, all of which work on any orderable DataType.
+    """
+    def _factory(field_type, field_name, options):
+        return cls(identifier, field_type)
+    return _factory
+
+
+def _build_numeric(cls, identifier: str):
+    def _factory(field_type, field_name, options):
+        _check_numeric(identifier, field_type)
+        return cls(identifier, field_type)
+    return _factory
+
+
+def _build_boolean(cls, identifier: str):
+    def _factory(field_type, field_name, options):
+        _check_boolean(identifier, field_type)
+        return cls(identifier, field_type)
+    return _factory
+
+
+register_aggregator(
+    NAME_PRIMARY_KEY,
+    _build_no_type_check(FieldPrimaryKeyAgg, NAME_PRIMARY_KEY),
+)
+register_aggregator(
+    NAME_LAST_VALUE,
+    _build_no_type_check(FieldLastValueAgg, NAME_LAST_VALUE),
+)
+register_aggregator(
+    NAME_LAST_NON_NULL_VALUE,
+    _build_no_type_check(FieldLastNonNullValueAgg, NAME_LAST_NON_NULL_VALUE),
+)
+register_aggregator(
+    NAME_FIRST_VALUE,
+    _build_no_type_check(FieldFirstValueAgg, NAME_FIRST_VALUE),
+)
+register_aggregator(
+    NAME_FIRST_NON_NULL_VALUE,
+    _build_no_type_check(FieldFirstNonNullValueAgg, NAME_FIRST_NON_NULL_VALUE),
+)
+register_aggregator(NAME_SUM, _build_numeric(FieldSumAgg, NAME_SUM))
+register_aggregator(NAME_MAX, _build_no_type_check(FieldMaxAgg, NAME_MAX))
+register_aggregator(NAME_MIN, _build_no_type_check(FieldMinAgg, NAME_MIN))
+register_aggregator(
+    NAME_BOOL_OR, _build_boolean(FieldBoolOrAgg, NAME_BOOL_OR)
+)
+register_aggregator(
+    NAME_BOOL_AND, _build_boolean(FieldBoolAndAgg, NAME_BOOL_AND)
+)
diff --git a/paimon-python/pypaimon/read/reader/aggregate/field_aggregator.py 
b/paimon-python/pypaimon/read/reader/aggregate/field_aggregator.py
new file mode 100644
index 0000000000..b306a4c3f7
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/aggregate/field_aggregator.py
@@ -0,0 +1,81 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Per-field aggregator abstraction used by the ``aggregation`` merge
+engine.
+
+Each non-PK field is reduced across rows sharing the same primary key
+by one ``FieldAggregator`` instance picked per field. The
+``AggregateMergeFunction`` drives the lifecycle: ``reset()`` at the
+start of each key group, ``agg()`` per input value, and the final
+accumulator is read out to build the merged row.
+"""
+
+from abc import ABC, abstractmethod
+from typing import Any
+
+from pypaimon.schema.data_types import DataType
+
+
+class FieldAggregator(ABC):
+    """Per-field aggregator base class.
+
+    Concrete subclasses implement :meth:`agg` and may override
+    :meth:`reset`. :meth:`retract` is intentionally left as the default
+    "refuse" implementation: pypaimon's ``AggregateMergeFunction``
+    rejects ``DELETE`` / ``UPDATE_BEFORE`` rows up-front, so no
+    aggregator's ``retract`` is reachable from the read path. The hook
+    is kept so a future PR can add retract semantics without changing
+    every subclass.
+    """
+
+    def __init__(self, name: str, field_type: DataType):
+        self.name = name
+        self.field_type = field_type
+
+    @abstractmethod
+    def agg(self, accumulator: Any, input_field: Any) -> Any:
+        """Combine ``accumulator`` with ``input_field`` and return the
+        new accumulator. Called once per row in the key group, in
+        arrival order (sequence-number ascending). ``accumulator`` is
+        ``None`` before the first add.
+        """
+
+    def reset(self) -> None:
+        """Reset internal state at the start of a new key group.
+
+        Default is a no-op. Aggregators that carry per-group bookkeeping
+        beyond the externally-passed accumulator (e.g. ``first_value``'s
+        "have we seen any row yet?" flag) must override this.
+        """
+
+    def retract(self, accumulator: Any, retract_field: Any) -> Any:
+        """Refuse the retract operation by default.
+
+        ``AggregateMergeFunction`` rejects retract rows at :meth:`add`
+        time, so this path is currently unreachable from the read
+        pipeline. The hook is kept for forward-compatibility: a future
+        PR that wires retract through the merge function can override
+        this on the aggregators that actually support it (sum, product,
+        last_value, ...).
+        """
+        raise NotImplementedError(
+            "Aggregator '{}' does not support retract; the aggregation "
+            "merge engine does not implement DELETE / UPDATE_BEFORE "
+            "handling.".format(self.name)
+        )
diff --git a/paimon-python/pypaimon/read/reader/aggregation_merge_function.py 
b/paimon-python/pypaimon/read/reader/aggregation_merge_function.py
new file mode 100644
index 0000000000..5e2c149738
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/aggregation_merge_function.py
@@ -0,0 +1,194 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Merge function for the ``aggregation`` merge engine.
+
+Rows sharing a primary key are folded across each non-PK field by the
+per-field :class:`FieldAggregator` configured in table options.
+``DeduplicateMergeFunction`` keeps only the latest row;
+``PartialUpdateMergeFunction`` lets later writes "fill in" fields the
+earlier writes left null; ``AggregateMergeFunction`` runs an actual
+aggregation (sum / max / min / last_value / ...) per column.
+
+This is the **core merge semantics only**. Retract on DELETE /
+UPDATE_BEFORE rows (with ``aggregation.remove-record-on-delete`` and
+``fields.<field>.ignore-retract`` opt-ins) and ~14 additional
+aggregators (``product`` / ``listagg`` / ``collect`` / ``merge_map`` /
+``nested_update`` / ``theta_sketch`` / ``hll_sketch`` /
+``roaring_bitmap_*``) are intentionally deferred. Non-INSERT row
+kinds raise ``NotImplementedError`` at :meth:`add` time so we never
+silently corrupt data with a half-implemented contract, and
+out-of-scope aggregator identifiers / options are rejected up-front in
+:mod:`pypaimon.read.merge_engine_support`.
+"""
+
+from typing import Any, List, Optional
+
+from pypaimon.read.reader.aggregate import create_field_aggregator
+from pypaimon.read.reader.aggregate.aggregators import (
+    NAME_LAST_NON_NULL_VALUE,
+    NAME_PRIMARY_KEY,
+)
+from pypaimon.read.reader.aggregate.field_aggregator import FieldAggregator
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.row.key_value import KeyValue
+from pypaimon.table.row.row_kind import RowKind
+
+
+# ---------------------------------------------------------------------------
+# Aggregator-list construction helpers. Live in this module (rather than
+# in split_read.py) so they can be exercised directly by unit tests.
+# ---------------------------------------------------------------------------
+
+
+def resolve_agg_func_name(field_name, primary_keys, options_map):
+    """Pick the aggregator identifier for ``field_name`` using the
+    following precedence:
+
+    1. Primary-key columns use ``primary_key`` (identity).
+    2. Otherwise, field-level ``fields.<f>.aggregate-function``
+       overrides everything.
+    3. Otherwise, the table-wide ``fields.default-aggregate-function``.
+    4. Otherwise, the system default ``last_non_null_value``.
+
+    Sequence fields are intentionally **not** special-cased here: the
+    merge-engine guard in :mod:`pypaimon.read.merge_engine_support`
+    rejects any table that sets ``sequence.field`` on the
+    ``aggregation`` engine, so by the time this function runs there is
+    no sequence field to disambiguate.
+    """
+    if field_name in primary_keys:
+        return NAME_PRIMARY_KEY
+    return (
+        options_map.get("fields.{}.aggregate-function".format(field_name))
+        or options_map.get("fields.default-aggregate-function")
+        or NAME_LAST_NON_NULL_VALUE
+    )
+
+
+def build_field_aggregators(
+    value_fields: List[DataField],
+    primary_keys: List[str],
+    core_options,
+) -> List[FieldAggregator]:
+    """Build the per-column aggregator list parallel to ``value_fields``.
+
+    Resolves the identifier for each field via :func:`resolve_agg_func_name`
+    and instantiates the aggregator through the registry. Type validation
+    for aggregators that care (``sum`` requires numeric, ``bool_or`` /
+    ``bool_and`` require boolean) runs inside the registered factory, so
+    misconfigured tables fail here rather than at first row.
+    """
+    options_map = core_options.options.to_map()
+    pk_set = set(primary_keys)
+    aggregators = []
+    for field in value_fields:
+        agg_name = resolve_agg_func_name(field.name, pk_set, options_map)
+        aggregators.append(
+            create_field_aggregator(
+                field.type, field.name, agg_name, core_options
+            )
+        )
+    return aggregators
+
+
+# ---------------------------------------------------------------------------
+# Merge function
+# ---------------------------------------------------------------------------
+
+
+class AggregateMergeFunction:
+    """A MergeFunction where the key is the primary key (unique) and
+    each non-PK column is reduced across the rows for that key by its
+    configured :class:`FieldAggregator`.
+
+    Follows the same ``MergeFunction`` protocol used by
+    :class:`SortMergeReaderWithMinHeap`: :meth:`reset` between groups
+    of same-key rows, :meth:`add` one row at a time (oldest to
+    newest), :meth:`get_result` after the group is exhausted.
+    """
+
+    def __init__(self,
+                 key_arity: int,
+                 value_arity: int,
+                 field_aggregators: List[FieldAggregator]):
+        if len(field_aggregators) != value_arity:
+            raise ValueError(
+                "field_aggregators length {} does not match value_arity "
+                "{}".format(len(field_aggregators), value_arity)
+            )
+        self._key_arity = key_arity
+        self._value_arity = value_arity
+        self._field_aggregators = field_aggregators
+        # Parallel to value indices. Reset at the start of every key
+        # group; updated in-place as ``add()`` calls feed rows in.
+        self._accumulators: List[Any] = [None] * value_arity
+        # Reference to the most recently added kv. Used only to
+        # propagate the key + sequence_number into the result row; we
+        # snapshot those values into a fresh tuple in ``get_result()``
+        # so the result is not aliased to upstream's reused KeyValue.
+        self._latest_kv: Optional[KeyValue] = None
+
+    def reset(self) -> None:
+        self._accumulators = [None] * self._value_arity
+        for agg in self._field_aggregators:
+            agg.reset()
+        self._latest_kv = None
+
+    def add(self, kv: KeyValue) -> None:
+        row_kind_byte = kv.value_row_kind_byte
+        if not RowKind.is_add_byte(row_kind_byte):
+            # DELETE / UPDATE_BEFORE rows are not supported by this
+            # merge engine. Refuse them rather than silently swallow
+            # rows, which would let aggregations diverge from the
+            # underlying data.
+            raise NotImplementedError(
+                "AggregateMergeFunction received a {} row; the "
+                "aggregation merge engine does not yet implement "
+                "retract (DELETE / UPDATE_BEFORE) handling. Tables "
+                "producing such rows are not yet supported."
+                .format(RowKind(row_kind_byte).to_string())
+            )
+
+        for i, agg in enumerate(self._field_aggregators):
+            input_val = kv.value.get_field(i)
+            self._accumulators[i] = agg.agg(self._accumulators[i], input_val)
+        self._latest_kv = kv
+
+    def get_result(self) -> Optional[KeyValue]:
+        if self._latest_kv is None:
+            return None
+
+        kv = self._latest_kv
+        # Snapshot the key as a fresh tuple — we cannot keep a reference
+        # to ``kv`` because upstream readers (e.g. KeyValueWrapReader)
+        # reuse a single KeyValue instance and mutate its underlying
+        # row_tuple between calls. Building a fresh tuple here means
+        # the result we return is decoupled from any subsequent
+        # iteration.
+        key_values = tuple(
+            kv.key.get_field(i) for i in range(self._key_arity)
+        )
+        result_row = key_values + (
+            kv.sequence_number,
+            RowKind.INSERT.value,
+        ) + tuple(self._accumulators)
+
+        result = KeyValue(self._key_arity, self._value_arity)
+        result.replace(result_row)
+        return result
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 6256bdd859..9b46a377c6 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -54,6 +54,8 @@ from pypaimon.read.reader.key_value_unwrap_reader import \
     KeyValueUnwrapRecordReader
 from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader
 from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
+from pypaimon.read.reader.aggregation_merge_function import (
+    AggregateMergeFunction, build_field_aggregators)
 from pypaimon.read.reader.partial_update_merge_function import \
     PartialUpdateMergeFunction
 from pypaimon.read.reader.first_row_merge_function import \
@@ -695,6 +697,23 @@ class MergeFileSplitRead(SplitRead):
             return FirstRowMergeFunction(
                 ignore_delete=self.table.options.ignore_delete(),
             )
+        if engine == MergeEngine.AGGREGATE:
+            # Use the full primary-key list, not ``trimmed_primary_key``:
+            # ``value_fields`` still carries partition columns, so any PK
+            # column that is also a partition column must be recognised
+            # as PK here. Otherwise a table with
+            # ``fields.default-aggregate-function`` would apply the
+            # default aggregator to that partition-PK column.
+            field_aggregators = build_field_aggregators(
+                self.value_fields,
+                self.table.primary_keys,
+                self.table.options,
+            )
+            return AggregateMergeFunction(
+                key_arity=len(self.trimmed_primary_key),
+                value_arity=self.value_arity,
+                field_aggregators=field_aggregators,
+            )
         # check_supported() rejects everything else at TableRead.__init__.
         raise AssertionError(
             "unreachable: merge-engine '{}' should have been rejected by "
diff --git a/paimon-python/pypaimon/tests/test_aggregation_e2e.py 
b/paimon-python/pypaimon/tests/test_aggregation_e2e.py
new file mode 100644
index 0000000000..8eed935371
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_aggregation_e2e.py
@@ -0,0 +1,350 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""End-to-end tests for the ``aggregation`` merge engine.
+
+Each test creates a PK table with ``merge-engine=aggregation`` plus
+per-field aggregator configuration, writes two or more commits against
+the same PK, and reads back. The aggregation engine must reduce each
+non-PK column independently using the configured aggregator (sum / max
+/ last_value / ...). Disjoint PKs must remain unmerged. Default
+behaviour when no aggregator is configured is ``last_non_null_value``.
+
+The second half of the file exercises the merge-engine-support guard:
+tables that configure aggregation with options pypaimon does not yet
+implement (retract opt-ins, sequence fields, out-of-scope aggregator
+identifiers) must raise ``NotImplementedError`` at TableRead
+construction rather than silently fall back to a wrong answer.
+"""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class AggregationMergeEngineE2ETest(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+
+        cls.pa_schema = pa.schema([
+            pa.field('id', pa.int64(), nullable=False),
+            ('total', pa.int64()),
+            ('max_score', pa.int64()),
+            ('label', pa.string()),
+        ])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def _create_pk_table(self, table_name, field_aggs=None,
+                         default_agg=None, extra_options=None):
+        # bucket=1 forces all rows for a given PK to land in the same
+        # bucket, which routes reads through SortMergeReader where the
+        # aggregation merge function lives. Without it, fresh
+        # single-snapshot tables take the raw_convertible fast path and
+        # bypass the merge function entirely.
+        options = {
+            'bucket': '1',
+            'merge-engine': 'aggregation',
+        }
+        if field_aggs:
+            for field_name, agg_func in field_aggs.items():
+                options['fields.{}.aggregate-function'.format(field_name)] = 
agg_func
+        if default_agg:
+            options['fields.default-aggregate-function'] = default_agg
+        if extra_options:
+            options.update(extra_options)
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['id'],
+            options=options,
+        )
+        full = 'default.{}'.format(table_name)
+        self.catalog.create_table(full, schema, False)
+        return self.catalog.get_table(full)
+
+    def _write(self, table, rows):
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        c = wb.new_commit()
+        try:
+            w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema))
+            c.commit(w.prepare_commit())
+        finally:
+            w.close()
+            c.close()
+
+    def _read(self, table):
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        if not splits:
+            return []
+        return sorted(
+            rb.new_read().to_arrow(splits).to_pylist(),
+            key=lambda r: r['id'],
+        )
+
+    # -- aggregation happy path -----------------------------------------
+
+    def test_sum_aggregator_across_commits(self):
+        table = self._create_pk_table(
+            'agg_sum',
+            field_aggs={'total': 'sum'},
+        )
+        self._write(table, [{'id': 1, 'total': 10, 'max_score': 5, 'label': 
'a'}])
+        self._write(table, [{'id': 1, 'total': 20, 'max_score': 3, 'label': 
'b'}])
+        self._write(table, [{'id': 1, 'total': 30, 'max_score': 8, 'label': 
'c'}])
+
+        rows = self._read(table)
+        self.assertEqual(len(rows), 1)
+        row = rows[0]
+        self.assertEqual(row['id'], 1)
+        # total: 10 + 20 + 30 = 60
+        self.assertEqual(row['total'], 60)
+        # max_score and label have no aggregator configured → default
+        # last_non_null_value: latest non-null wins.
+        self.assertEqual(row['max_score'], 8)
+        self.assertEqual(row['label'], 'c')
+
+    def test_multiple_aggregators_compose(self):
+        table = self._create_pk_table(
+            'agg_multi',
+            field_aggs={
+                'total': 'sum',
+                'max_score': 'max',
+                'label': 'last_value',
+            },
+        )
+        self._write(table, [{'id': 1, 'total': 10, 'max_score': 5, 'label': 
'a'}])
+        self._write(table, [{'id': 1, 'total': 7, 'max_score': 12, 'label': 
'b'}])
+        self._write(table, [{'id': 1, 'total': 3, 'max_score': 1, 'label': 
'c'}])
+
+        row = self._read(table)[0]
+        self.assertEqual(row['total'], 20)        # sum: 10+7+3
+        self.assertEqual(row['max_score'], 12)     # max: max(5,12,1)
+        self.assertEqual(row['label'], 'c')        # last_value
+
+    def test_null_inputs_follow_aggregator_semantics(self):
+        table = self._create_pk_table(
+            'agg_nulls',
+            field_aggs={
+                'total': 'sum',
+                'max_score': 'last_value',
+            },
+        )
+        self._write(table, [{'id': 1, 'total': 5, 'max_score': 7, 'label': 
'x'}])
+        # null total is absorbed by sum; null max_score replaces under
+        # last_value (last_value keeps the last input verbatim,
+        # including None).
+        self._write(table, [{'id': 1, 'total': None, 'max_score': None, 
'label': None}])
+        self._write(table, [{'id': 1, 'total': 4, 'max_score': 9, 'label': 
'y'}])
+
+        row = self._read(table)[0]
+        self.assertEqual(row['total'], 9)          # 5 + 4 (None absorbed)
+        self.assertEqual(row['max_score'], 9)      # last_value's last input
+        # label: default last_non_null_value, intermediate None ignored,
+        # the final 'y' wins.
+        self.assertEqual(row['label'], 'y')
+
+    def test_disjoint_keys_remain_unmerged(self):
+        table = self._create_pk_table(
+            'agg_disjoint',
+            field_aggs={'total': 'sum'},
+        )
+        self._write(table, [
+            {'id': 1, 'total': 10, 'max_score': 1, 'label': 'a'},
+            {'id': 2, 'total': 20, 'max_score': 2, 'label': 'b'},
+            {'id': 3, 'total': 30, 'max_score': 3, 'label': 'c'},
+        ])
+        # Second commit only touches id=2.
+        self._write(table, [{'id': 2, 'total': 5, 'max_score': 7, 'label': 
'B'}])
+
+        rows = self._read(table)
+        self.assertEqual(rows, [
+            {'id': 1, 'total': 10, 'max_score': 1, 'label': 'a'},
+            {'id': 2, 'total': 25, 'max_score': 7, 'label': 'B'},
+            {'id': 3, 'total': 30, 'max_score': 3, 'label': 'c'},
+        ])
+
+    def test_default_aggregator_applies_to_unconfigured_fields(self):
+        table = self._create_pk_table(
+            'agg_default',
+            default_agg='max',
+        )
+        self._write(table, [{'id': 1, 'total': 3, 'max_score': 5, 'label': 
'm'}])
+        self._write(table, [{'id': 1, 'total': 7, 'max_score': 2, 'label': 
'a'}])
+        self._write(table, [{'id': 1, 'total': 1, 'max_score': 9, 'label': 
'z'}])
+
+        row = self._read(table)[0]
+        # All non-PK fields fall through to 
fields.default-aggregate-function=max.
+        self.assertEqual(row['total'], 7)
+        self.assertEqual(row['max_score'], 9)
+        self.assertEqual(row['label'], 'z')  # 'z' > 'm' > 'a' 
lexicographically
+
+    def test_default_behavior_is_last_non_null_value(self):
+        # No field-level or default aggregator configured → every non-PK
+        # field uses the system default last_non_null_value.
+        table = self._create_pk_table('agg_implicit_default')
+        self._write(table, [{'id': 1, 'total': 5, 'max_score': 9, 'label': 
'a'}])
+        self._write(table, [{'id': 1, 'total': None, 'max_score': 3, 'label': 
None}])
+        self._write(table, [{'id': 1, 'total': 7, 'max_score': None, 'label': 
'b'}])
+
+        row = self._read(table)[0]
+        self.assertEqual(row['total'], 7)       # latest non-null
+        self.assertEqual(row['max_score'], 3)   # latest non-null
+        self.assertEqual(row['label'], 'b')     # latest non-null
+
+    # -- unsupported-option guards --------------------------------------
+    #
+    # Tables that opt into behaviour AggregateMergeFunction doesn't
+    # implement must surface a NotImplementedError at TableRead
+    # construction, not silently produce wrong results.
+
+    def _create_and_expect_unsupported(self, table_name, extra_options,
+                                       expected_substring):
+        table = self._create_pk_table(
+            table_name, extra_options=extra_options
+        )
+        # Writing is fine — the guard fires when a reader is built.
+        self._write(table, [{'id': 1, 'total': 1, 'max_score': 1, 'label': 
'a'}])
+        rb = table.new_read_builder()
+        with self.assertRaises(NotImplementedError) as cm:
+            rb.new_read()
+        msg = str(cm.exception)
+        self.assertIn('aggregation', msg)
+        self.assertIn(expected_substring, msg)
+
+    def test_remove_record_on_delete_rejected(self):
+        self._create_and_expect_unsupported(
+            'agg_reject_remove_on_delete',
+            {'aggregation.remove-record-on-delete': 'true'},
+            'aggregation.remove-record-on-delete',
+        )
+
+    def test_field_ignore_retract_rejected(self):
+        self._create_and_expect_unsupported(
+            'agg_reject_ignore_retract',
+            {'fields.total.ignore-retract': 'true'},
+            'fields.total.ignore-retract',
+        )
+
+    def test_sequence_field_rejected(self):
+        self._create_and_expect_unsupported(
+            'agg_reject_sequence_field',
+            {'sequence.field': 'total'},
+            'sequence.field',
+        )
+
+    def test_field_sequence_group_rejected(self):
+        self._create_and_expect_unsupported(
+            'agg_reject_sequence_group',
+            {'fields.max_score.sequence-group': 'label'},
+            'fields.max_score.sequence-group',
+        )
+
+    def test_out_of_scope_field_aggregator_rejected(self):
+        # collect is one of the aggregator identifiers this engine
+        # doesn't support yet. The guard must reject the config rather
+        # than let the per-field factory build a (silently wrong)
+        # fallback.
+        self._create_and_expect_unsupported(
+            'agg_reject_collect',
+            {'fields.label.aggregate-function': 'collect'},
+            'fields.label.aggregate-function',
+        )
+
+    def test_out_of_scope_default_aggregator_rejected(self):
+        self._create_and_expect_unsupported(
+            'agg_reject_default_collect',
+            {'fields.default-aggregate-function': 'product'},
+            'fields.default-aggregate-function',
+        )
+
+    def test_supported_field_aggregator_passes_guard(self):
+        # Sanity check: setting one of the supported aggregators does
+        # NOT trip the guard introduced for out-of-scope identifiers.
+        table = self._create_pk_table(
+            'agg_supported_passes',
+            field_aggs={'total': 'sum'},
+        )
+        self._write(table, [{'id': 1, 'total': 1, 'max_score': 1, 'label': 
'a'}])
+        # If the guard wrongly flagged 'sum', new_read() would raise.
+        # Touch it explicitly so the test fails loudly otherwise.
+        table.new_read_builder().new_read()
+
+    # -- partition column that is also part of the primary key ----------
+
+    def test_partition_pk_overlap_not_aggregated_by_default(self):
+        # When a partition column is also part of the primary key and a
+        # table-wide ``fields.default-aggregate-function`` is configured,
+        # the partition-PK column must be treated as PK (identity) and
+        # not run through the default aggregator. Regression for the
+        # split_read bug where the trimmed PK list (which drops
+        # partition columns) was passed to ``build_field_aggregators``.
+        pa_schema = pa.schema([
+            pa.field('p', pa.int64(), nullable=False),
+            pa.field('id', pa.int64(), nullable=False),
+            pa.field('v', pa.int64()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            primary_keys=['p', 'id'],
+            partition_keys=['p'],
+            options={
+                'bucket': '1',
+                'merge-engine': 'aggregation',
+                'fields.default-aggregate-function': 'sum',
+            },
+        )
+        self.catalog.create_table(
+            'default.agg_partition_pk_overlap', schema, False)
+        table = self.catalog.get_table('default.agg_partition_pk_overlap')
+
+        def write(rows):
+            wb = table.new_batch_write_builder()
+            w = wb.new_write()
+            c = wb.new_commit()
+            try:
+                w.write_arrow(pa.Table.from_pylist(rows, schema=pa_schema))
+                c.commit(w.prepare_commit())
+            finally:
+                w.close()
+                c.close()
+
+        write([{'p': 1, 'id': 1, 'v': 10}])
+        write([{'p': 1, 'id': 1, 'v': 20}])
+
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        rows = rb.new_read().to_arrow(splits).to_pylist()
+        self.assertEqual(rows, [{'p': 1, 'id': 1, 'v': 30}])
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_aggregation_merge_function.py 
b/paimon-python/pypaimon/tests/test_aggregation_merge_function.py
new file mode 100644
index 0000000000..7ff00fbb9b
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_aggregation_merge_function.py
@@ -0,0 +1,300 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Direct unit tests for :class:`AggregateMergeFunction` and its
+helper functions.
+
+Drives the merge function with synthetic :class:`KeyValue` instances
+so the contract is pinned down without going through the full read
+pipeline. The end-to-end behaviour on real PK tables is exercised
+separately in ``test_aggregation_e2e.py``.
+"""
+
+import unittest
+
+from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.common.options.options import Options
+from pypaimon.read.reader.aggregate import create_field_aggregator
+from pypaimon.read.reader.aggregation_merge_function import (
+    AggregateMergeFunction,
+    build_field_aggregators,
+    resolve_agg_func_name,
+)
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.table.row.key_value import KeyValue
+from pypaimon.table.row.row_kind import RowKind
+
+
+def _kv(key, seq, row_kind, value):
+    """Build a fresh KeyValue for a (key, sequence, row_kind, value)
+    tuple. Same shape as the helper in
+    ``test_partial_update_merge_function.py`` so both test files read
+    consistently.
+    """
+    kv = KeyValue(key_arity=len(key), value_arity=len(value))
+    kv.replace(tuple(key) + (seq, row_kind.value) + tuple(value))
+    return kv
+
+
+def _result_value(kv):
+    return tuple(kv.value.get_field(i) for i in range(kv.value_arity))
+
+
+def _result_key(kv):
+    return tuple(kv.key.get_field(i) for i in range(kv.key_arity))
+
+
+def _make_agg(identifier, sql_type, field_name="f"):
+    return create_field_aggregator(
+        AtomicType(sql_type), field_name, identifier, options=None
+    )
+
+
+class AggregateMergeFunctionTest(unittest.TestCase):
+
+    def test_single_insert_returns_aggregated_value(self):
+        mf = AggregateMergeFunction(
+            key_arity=1, value_arity=1,
+            field_aggregators=[_make_agg("sum", "BIGINT")],
+        )
+        mf.reset()
+        mf.add(_kv((1,), 100, RowKind.INSERT, (10,)))
+        result = mf.get_result()
+
+        self.assertIsNotNone(result)
+        self.assertEqual(_result_key(result), (1,))
+        self.assertEqual(_result_value(result), (10,))
+        self.assertEqual(result.sequence_number, 100)
+        self.assertEqual(result.value_row_kind_byte, RowKind.INSERT.value)
+
+    def test_multi_row_aggregation_across_fields(self):
+        # field 0: sum, field 1: max, field 2: last_value
+        mf = AggregateMergeFunction(
+            key_arity=1, value_arity=3,
+            field_aggregators=[
+                _make_agg("sum", "BIGINT", "v_sum"),
+                _make_agg("max", "INT", "v_max"),
+                _make_agg("last_value", "VARCHAR", "v_last"),
+            ],
+        )
+        mf.reset()
+        mf.add(_kv((1,), 100, RowKind.INSERT, (10, 5, "a")))
+        mf.add(_kv((1,), 101, RowKind.INSERT, (20, 3, "b")))
+        mf.add(_kv((1,), 102, RowKind.INSERT, (30, 9, "c")))
+
+        result = mf.get_result()
+        self.assertEqual(_result_value(result), (60, 9, "c"))
+        # latest sequence is propagated through.
+        self.assertEqual(result.sequence_number, 102)
+
+    def test_null_inputs_follow_aggregator_semantics(self):
+        # sum drops nulls; last_non_null_value drops nulls; last_value keeps 
them.
+        mf = AggregateMergeFunction(
+            key_arity=1, value_arity=3,
+            field_aggregators=[
+                _make_agg("sum", "BIGINT", "v_sum"),
+                _make_agg("last_non_null_value", "VARCHAR", "v_lnn"),
+                _make_agg("last_value", "VARCHAR", "v_last"),
+            ],
+        )
+        mf.reset()
+        mf.add(_kv((1,), 100, RowKind.INSERT, (10, "x", "x")))
+        mf.add(_kv((1,), 101, RowKind.INSERT, (None, None, None)))
+        mf.add(_kv((1,), 102, RowKind.INSERT, (5, None, "z")))
+
+        result = mf.get_result()
+        # sum: 10 + 5 = 15 (nulls absorbed)
+        # last_non_null: 'x' (intermediate nulls preserved earlier value)
+        # last_value: 'z' (the very last value, including the prior None)
+        self.assertEqual(_result_value(result), (15, "x", "z"))
+
+    def test_update_after_treated_as_insert(self):
+        mf = AggregateMergeFunction(
+            key_arity=1, value_arity=1,
+            field_aggregators=[_make_agg("sum", "BIGINT")],
+        )
+        mf.reset()
+        mf.add(_kv((1,), 100, RowKind.UPDATE_AFTER, (7,)))
+        result = mf.get_result()
+        self.assertEqual(_result_value(result), (7,))
+
+    def test_delete_row_raises_not_implemented(self):
+        mf = AggregateMergeFunction(
+            key_arity=1, value_arity=1,
+            field_aggregators=[_make_agg("sum", "BIGINT")],
+        )
+        mf.reset()
+        with self.assertRaises(NotImplementedError) as ctx:
+            mf.add(_kv((1,), 100, RowKind.DELETE, (5,)))
+        self.assertIn("retract", str(ctx.exception))
+        self.assertIn("-D", str(ctx.exception))
+
+    def test_update_before_row_raises_not_implemented(self):
+        mf = AggregateMergeFunction(
+            key_arity=1, value_arity=1,
+            field_aggregators=[_make_agg("sum", "BIGINT")],
+        )
+        mf.reset()
+        with self.assertRaises(NotImplementedError) as ctx:
+            mf.add(_kv((1,), 100, RowKind.UPDATE_BEFORE, (5,)))
+        self.assertIn("-U", str(ctx.exception))
+
+    def test_reset_between_keys_clears_state(self):
+        mf = AggregateMergeFunction(
+            key_arity=1, value_arity=2,
+            field_aggregators=[
+                _make_agg("sum", "BIGINT"),
+                _make_agg("first_value", "VARCHAR"),
+            ],
+        )
+        # Key group 1.
+        mf.reset()
+        mf.add(_kv((1,), 100, RowKind.INSERT, (5, "a")))
+        mf.add(_kv((1,), 101, RowKind.INSERT, (3, "b")))
+        r1 = mf.get_result()
+        self.assertEqual(_result_value(r1), (8, "a"))
+        # Key group 2 — sum must restart from 0 and first_value must
+        # re-arm so the new group's first row wins.
+        mf.reset()
+        mf.add(_kv((2,), 200, RowKind.INSERT, (10, "x")))
+        mf.add(_kv((2,), 201, RowKind.INSERT, (20, "y")))
+        r2 = mf.get_result()
+        self.assertEqual(_result_key(r2), (2,))
+        self.assertEqual(_result_value(r2), (30, "x"))
+
+    def test_get_result_before_any_add_returns_none(self):
+        mf = AggregateMergeFunction(
+            key_arity=1, value_arity=1,
+            field_aggregators=[_make_agg("sum", "BIGINT")],
+        )
+        mf.reset()
+        self.assertIsNone(mf.get_result())
+
+    def test_result_is_decoupled_from_input_kv(self):
+        """Critical: upstream KeyValueWrapReader reuses one KeyValue and
+        rebinds its row_tuple between iterations. The merge function
+        must snapshot its output so the previously-returned result is
+        not silently mutated when the source advances.
+        """
+        mf = AggregateMergeFunction(
+            key_arity=1, value_arity=1,
+            field_aggregators=[_make_agg("sum", "BIGINT")],
+        )
+        mf.reset()
+        # Build one reusable kv and rebind it twice, like upstream does.
+        kv = KeyValue(key_arity=1, value_arity=1)
+        kv.replace((1, 100, RowKind.INSERT.value, 5))
+        mf.add(kv)
+        kv.replace((1, 101, RowKind.INSERT.value, 7))
+        mf.add(kv)
+        result = mf.get_result()
+
+        # Now mutate the source kv. The previously-captured result must
+        # NOT change.
+        kv.replace((999, 999, RowKind.INSERT.value, 99999))
+
+        self.assertEqual(_result_key(result), (1,))
+        self.assertEqual(_result_value(result), (12,))  # 5 + 7
+        self.assertEqual(result.sequence_number, 101)
+
+    def test_value_arity_mismatch_at_construction_raises(self):
+        with self.assertRaises(ValueError):
+            AggregateMergeFunction(
+                key_arity=1, value_arity=2,
+                field_aggregators=[_make_agg("sum", "BIGINT")],
+            )
+
+
+class ResolveAggFuncNameTest(unittest.TestCase):
+
+    def test_primary_key_takes_precedence(self):
+        name = resolve_agg_func_name(
+            "id", primary_keys={"id"}, options_map={
+                "fields.id.aggregate-function": "sum",
+            }
+        )
+        self.assertEqual(name, "primary_key")
+
+    def test_field_level_override_wins_over_default(self):
+        name = resolve_agg_func_name(
+            "v", primary_keys=set(), options_map={
+                "fields.v.aggregate-function": "max",
+                "fields.default-aggregate-function": "sum",
+            }
+        )
+        self.assertEqual(name, "max")
+
+    def test_table_default_used_when_no_field_override(self):
+        name = resolve_agg_func_name(
+            "v", primary_keys=set(), options_map={
+                "fields.default-aggregate-function": "sum",
+            }
+        )
+        self.assertEqual(name, "sum")
+
+    def test_system_default_when_nothing_configured(self):
+        name = resolve_agg_func_name(
+            "v", primary_keys=set(), options_map={}
+        )
+        self.assertEqual(name, "last_non_null_value")
+
+
+class BuildFieldAggregatorsTest(unittest.TestCase):
+
+    def _make_options(self, raw):
+        return CoreOptions(Options(raw))
+
+    def test_builds_aggregators_aligned_with_value_fields(self):
+        fields = [
+            DataField(0, "id", AtomicType("BIGINT")),
+            DataField(1, "amount", AtomicType("BIGINT")),
+            DataField(2, "name", AtomicType("VARCHAR")),
+        ]
+        options = self._make_options({
+            "fields.amount.aggregate-function": "sum",
+        })
+        aggs = build_field_aggregators(
+            value_fields=fields,
+            primary_keys=["id"],
+            core_options=options,
+        )
+        self.assertEqual(len(aggs), 3)
+        self.assertEqual(aggs[0].name, "primary_key")
+        self.assertEqual(aggs[1].name, "sum")
+        # Falls through to the system default since no override and no
+        # fields.default-aggregate-function is set.
+        self.assertEqual(aggs[2].name, "last_non_null_value")
+
+    def test_unknown_aggregator_identifier_raises(self):
+        fields = [
+            DataField(0, "id", AtomicType("BIGINT")),
+            DataField(1, "v", AtomicType("BIGINT")),
+        ]
+        options = self._make_options({
+            "fields.v.aggregate-function": "no_such_aggregator",
+        })
+        with self.assertRaises(ValueError):
+            build_field_aggregators(
+                value_fields=fields,
+                primary_keys=["id"],
+                core_options=options,
+            )
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_field_aggregator_registry.py 
b/paimon-python/pypaimon/tests/test_field_aggregator_registry.py
new file mode 100644
index 0000000000..2927f35b7b
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_field_aggregator_registry.py
@@ -0,0 +1,103 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Unit tests for the FieldAggregator registry contract.
+
+Drives :func:`register_aggregator` / :func:`create_field_aggregator`
+without touching the read pipeline so the wiring is pinned down before
+any concrete aggregators land in :mod:`aggregators`.
+"""
+
+import unittest
+
+from pypaimon.read.reader.aggregate import (
+    create_field_aggregator,
+    register_aggregator,
+)
+from pypaimon.read.reader.aggregate.field_aggregator import FieldAggregator
+from pypaimon.schema.data_types import AtomicType
+
+
+class _DummyAgg(FieldAggregator):
+    """Minimal concrete subclass used only by these tests."""
+
+    def agg(self, accumulator, input_field):
+        return input_field
+
+
+class FieldAggregatorRegistryTest(unittest.TestCase):
+
+    def test_register_and_create_returns_instance(self):
+        register_aggregator(
+            "_dummy_for_registry_test",
+            lambda field_type, field_name, options: _DummyAgg(
+                "_dummy_for_registry_test", field_type
+            ),
+        )
+        agg = create_field_aggregator(
+            AtomicType("INT"),
+            "field0",
+            "_dummy_for_registry_test",
+            options=None,
+        )
+        self.assertIsInstance(agg, _DummyAgg)
+        self.assertEqual(agg.name, "_dummy_for_registry_test")
+        self.assertEqual(agg.field_type, AtomicType("INT"))
+
+    def test_re_register_replaces_existing_factory(self):
+        register_aggregator(
+            "_dummy_replaceable",
+            lambda ft, fn, opts: _DummyAgg("first", ft),
+        )
+        register_aggregator(
+            "_dummy_replaceable",
+            lambda ft, fn, opts: _DummyAgg("second", ft),
+        )
+        agg = create_field_aggregator(
+            AtomicType("BIGINT"), "f", "_dummy_replaceable", options=None
+        )
+        self.assertEqual(agg.name, "second")
+
+    def test_unknown_identifier_raises_value_error(self):
+        with self.assertRaises(ValueError) as ctx:
+            create_field_aggregator(
+                AtomicType("INT"),
+                "field0",
+                "this_aggregator_does_not_exist",
+                options=None,
+            )
+        msg = str(ctx.exception)
+        self.assertIn("unsupported aggregation", msg)
+        self.assertIn("this_aggregator_does_not_exist", msg)
+
+    def test_default_retract_raises_not_implemented(self):
+        agg = _DummyAgg("dummy", AtomicType("INT"))
+        with self.assertRaises(NotImplementedError) as ctx:
+            agg.retract(1, 2)
+        self.assertIn("does not support retract", str(ctx.exception))
+        self.assertIn("dummy", str(ctx.exception))
+
+    def test_default_reset_is_noop(self):
+        # Base-class reset() must not raise so subclasses without
+        # per-group state can skip overriding it.
+        agg = _DummyAgg("dummy", AtomicType("INT"))
+        agg.reset()  # no exception expected
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_field_aggregators.py 
b/paimon-python/pypaimon/tests/test_field_aggregators.py
new file mode 100644
index 0000000000..f54a67cd95
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_field_aggregators.py
@@ -0,0 +1,274 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Unit tests for the built-in :class:`FieldAggregator` subclasses.
+
+Drives each aggregator directly to pin down the value semantics
+(reset behaviour, null handling, type validation) without going
+through the merge function or the read pipeline. End-to-end coverage
+on real PK tables lives in ``test_aggregation_e2e.py``.
+"""
+
+import datetime
+import unittest
+from decimal import Decimal
+
+from pypaimon.read.reader.aggregate import create_field_aggregator
+from pypaimon.read.reader.aggregate.aggregators import (
+    FieldBoolAndAgg,
+    FieldBoolOrAgg,
+    FieldFirstNonNullValueAgg,
+    FieldFirstValueAgg,
+    FieldLastNonNullValueAgg,
+    FieldLastValueAgg,
+    FieldMaxAgg,
+    FieldMinAgg,
+    FieldPrimaryKeyAgg,
+    FieldSumAgg,
+)
+from pypaimon.schema.data_types import AtomicType
+
+
+def _make(identifier, sql_type):
+    """Build an aggregator through the public registry path so we also
+    exercise the registered factory (including its type validation).
+    """
+    return create_field_aggregator(
+        AtomicType(sql_type), "field0", identifier, options=None
+    )
+
+
+class FieldPrimaryKeyAggTest(unittest.TestCase):
+
+    def test_returns_input_field(self):
+        agg = _make("primary_key", "BIGINT")
+        self.assertIsInstance(agg, FieldPrimaryKeyAgg)
+        self.assertEqual(agg.agg(None, 5), 5)
+        self.assertEqual(agg.agg(99, 5), 5)
+        self.assertIsNone(agg.agg(5, None))
+
+
+class FieldLastValueAggTest(unittest.TestCase):
+
+    def test_last_value_wins_including_null(self):
+        agg = _make("last_value", "VARCHAR")
+        self.assertIsInstance(agg, FieldLastValueAgg)
+        self.assertEqual(agg.agg(None, "a"), "a")
+        self.assertEqual(agg.agg("a", "b"), "b")
+        # Crucially: a later null replaces the accumulator (unlike
+        # last_non_null_value).
+        self.assertIsNone(agg.agg("a", None))
+
+
+class FieldLastNonNullValueAggTest(unittest.TestCase):
+
+    def test_null_inputs_are_absorbed(self):
+        agg = _make("last_non_null_value", "INT")
+        self.assertIsInstance(agg, FieldLastNonNullValueAgg)
+        self.assertEqual(agg.agg(None, 1), 1)
+        self.assertEqual(agg.agg(1, 2), 2)
+        self.assertEqual(agg.agg(2, None), 2)
+        self.assertIsNone(agg.agg(None, None))
+
+
+class FieldFirstValueAggTest(unittest.TestCase):
+
+    def test_first_value_locks_after_first_add(self):
+        agg = _make("first_value", "VARCHAR")
+        self.assertIsInstance(agg, FieldFirstValueAgg)
+        # First add returns input, even if input is None.
+        self.assertIsNone(agg.agg(None, None))
+        # Subsequent adds preserve the accumulator (None) regardless of input.
+        self.assertIsNone(agg.agg(None, "later"))
+
+    def test_reset_re_arms_first_value(self):
+        agg = _make("first_value", "INT")
+        self.assertEqual(agg.agg(None, 5), 5)
+        self.assertEqual(agg.agg(5, 9), 5)  # locked
+        agg.reset()
+        # After reset the next add is treated as the first again.
+        self.assertEqual(agg.agg(None, 42), 42)
+
+
+class FieldFirstNonNullValueAggTest(unittest.TestCase):
+
+    def test_first_non_null_skips_nulls(self):
+        agg = _make("first_non_null_value", "INT")
+        self.assertIsInstance(agg, FieldFirstNonNullValueAgg)
+        # Initial null does not lock — accumulator stays None.
+        self.assertIsNone(agg.agg(None, None))
+        # First non-null locks.
+        self.assertEqual(agg.agg(None, 7), 7)
+        # Subsequent values do not replace the locked first.
+        self.assertEqual(agg.agg(7, 99), 7)
+        self.assertEqual(agg.agg(7, None), 7)
+
+    def test_reset_re_arms_first_non_null(self):
+        agg = _make("first_non_null_value", "INT")
+        self.assertEqual(agg.agg(None, 1), 1)
+        self.assertEqual(agg.agg(1, 2), 1)
+        agg.reset()
+        self.assertEqual(agg.agg(None, 9), 9)
+
+
+class FieldSumAggTest(unittest.TestCase):
+
+    def test_int_sum(self):
+        agg = _make("sum", "BIGINT")
+        self.assertIsInstance(agg, FieldSumAgg)
+        self.assertEqual(agg.agg(None, 5), 5)
+        self.assertEqual(agg.agg(5, 7), 12)
+
+    def test_float_sum(self):
+        agg = _make("sum", "DOUBLE")
+        self.assertAlmostEqual(agg.agg(1.5, 2.25), 3.75)
+
+    def test_decimal_sum(self):
+        agg = _make("sum", "DECIMAL(10,2)")
+        result = agg.agg(Decimal("1.23"), Decimal("4.56"))
+        self.assertEqual(result, Decimal("5.79"))
+
+    def test_null_inputs_return_non_null_operand(self):
+        agg = _make("sum", "INT")
+        self.assertEqual(agg.agg(None, 5), 5)
+        self.assertEqual(agg.agg(5, None), 5)
+        self.assertIsNone(agg.agg(None, None))
+
+    def test_non_numeric_type_rejected_at_construction(self):
+        with self.assertRaises(ValueError) as ctx:
+            _make("sum", "VARCHAR")
+        self.assertIn("numeric", str(ctx.exception))
+
+
+class FieldMaxAggTest(unittest.TestCase):
+
+    def test_numeric_max(self):
+        agg = _make("max", "INT")
+        self.assertIsInstance(agg, FieldMaxAgg)
+        self.assertEqual(agg.agg(3, 7), 7)
+        self.assertEqual(agg.agg(7, 3), 7)
+        self.assertEqual(agg.agg(5, 5), 5)
+
+    def test_string_max(self):
+        agg = _make("max", "VARCHAR")
+        self.assertEqual(agg.agg("apple", "banana"), "banana")
+        self.assertEqual(agg.agg("banana", "apple"), "banana")
+
+    def test_date_max(self):
+        agg = _make("max", "DATE")
+        d1 = datetime.date(2020, 1, 1)
+        d2 = datetime.date(2025, 6, 15)
+        self.assertEqual(agg.agg(d1, d2), d2)
+        self.assertEqual(agg.agg(d2, d1), d2)
+
+    def test_null_inputs_return_non_null_operand(self):
+        agg = _make("max", "INT")
+        self.assertEqual(agg.agg(None, 5), 5)
+        self.assertEqual(agg.agg(5, None), 5)
+        self.assertIsNone(agg.agg(None, None))
+
+
+class FieldMinAggTest(unittest.TestCase):
+
+    def test_numeric_min(self):
+        agg = _make("min", "INT")
+        self.assertIsInstance(agg, FieldMinAgg)
+        self.assertEqual(agg.agg(3, 7), 3)
+        self.assertEqual(agg.agg(7, 3), 3)
+        self.assertEqual(agg.agg(5, 5), 5)
+
+    def test_string_min(self):
+        agg = _make("min", "VARCHAR")
+        self.assertEqual(agg.agg("apple", "banana"), "apple")
+
+    def test_null_inputs_return_non_null_operand(self):
+        agg = _make("min", "INT")
+        self.assertEqual(agg.agg(None, 5), 5)
+        self.assertEqual(agg.agg(5, None), 5)
+        self.assertIsNone(agg.agg(None, None))
+
+
+class FieldBoolOrAggTest(unittest.TestCase):
+
+    def test_truth_table(self):
+        agg = _make("bool_or", "BOOLEAN")
+        self.assertIsInstance(agg, FieldBoolOrAgg)
+        self.assertTrue(agg.agg(True, True))
+        self.assertTrue(agg.agg(True, False))
+        self.assertTrue(agg.agg(False, True))
+        self.assertFalse(agg.agg(False, False))
+
+    def test_null_inputs_return_non_null_operand(self):
+        agg = _make("bool_or", "BOOLEAN")
+        self.assertTrue(agg.agg(None, True))
+        self.assertFalse(agg.agg(False, None))
+        self.assertIsNone(agg.agg(None, None))
+
+    def test_non_boolean_type_rejected_at_construction(self):
+        with self.assertRaises(ValueError) as ctx:
+            _make("bool_or", "INT")
+        self.assertIn("BOOLEAN", str(ctx.exception))
+
+
+class FieldBoolAndAggTest(unittest.TestCase):
+
+    def test_truth_table(self):
+        agg = _make("bool_and", "BOOLEAN")
+        self.assertIsInstance(agg, FieldBoolAndAgg)
+        self.assertTrue(agg.agg(True, True))
+        self.assertFalse(agg.agg(True, False))
+        self.assertFalse(agg.agg(False, True))
+        self.assertFalse(agg.agg(False, False))
+
+    def test_null_inputs_return_non_null_operand(self):
+        agg = _make("bool_and", "BOOLEAN")
+        self.assertTrue(agg.agg(None, True))
+        self.assertFalse(agg.agg(False, None))
+        self.assertIsNone(agg.agg(None, None))
+
+    def test_non_boolean_type_rejected_at_construction(self):
+        with self.assertRaises(ValueError) as ctx:
+            _make("bool_and", "VARCHAR")
+        self.assertIn("BOOLEAN", str(ctx.exception))
+
+
+class RegistrationTest(unittest.TestCase):
+    """Sanity check that all 10 expected aggregators (the primary-key
+    placeholder plus 9 value aggregators) are registered when the
+    package is imported. Guards against future refactors silently
+    dropping a registration.
+    """
+
+    EXPECTED = frozenset([
+        "primary_key",
+        "last_value", "last_non_null_value",
+        "first_value", "first_non_null_value",
+        "sum", "max", "min",
+        "bool_or", "bool_and",
+    ])
+
+    def test_all_expected_aggregators_registered(self):
+        from pypaimon.read.reader.aggregate import _FACTORIES
+        registered = set(_FACTORIES.keys())
+        missing = self.EXPECTED - registered
+        self.assertEqual(missing, set(),
+                         "Missing built-in aggregators: {}".format(missing))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py 
b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
index 9d2b81530b..8606d6c239 100644
--- a/paimon-python/pypaimon/tests/test_partial_update_e2e.py
+++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
@@ -21,10 +21,9 @@
 Each test creates a PK table with ``merge-engine`` set to a particular
 value, writes one or more batches, and reads back. Partial-update reads
 must merge non-null fields across batches; ``deduplicate`` must keep
-the latest row only; ``first-row`` must keep the earliest row;
-``aggregation`` must raise ``NotImplementedError`` (until it is
-ported), since silently treating it as deduplicate would corrupt the
-user's data.
+the latest row only; ``first-row`` must keep the earliest row.
+``aggregation`` has its own engine-specific e2e coverage in
+:mod:`test_aggregation_e2e`.
 """
 
 import os
@@ -189,21 +188,7 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
             [{'id': 1, 'a': 'new', 'b': None, 'c': None}],
         )
 
-    # -- engines we don't support yet ------------------------------------
-
-    def test_aggregation_engine_raises_not_implemented(self):
-        """Until ``aggregation`` is ported, reading an aggregation table
-        must raise rather than silently produce dedupe results."""
-        table = self._create_pk_table('agg_unsupported',
-                                      merge_engine='aggregation')
-        self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}])
-        self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}])
-
-        rb = table.new_read_builder()
-        splits = rb.new_scan().plan().splits()
-        with self.assertRaises(NotImplementedError) as cm:
-            rb.new_read().to_arrow(splits)
-        self.assertIn('aggregation', str(cm.exception))
+    # -- other supported engines (smoke) ---------------------------------
 
     def test_first_row_engine_keeps_first(self):
         """The ``first-row`` engine must keep the earliest row per PK."""

Reply via email to