This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8ad391f4fc [python] Implement aggregation merge engine in pypaimon
(#7952)
8ad391f4fc is described below
commit 8ad391f4fce57569afb6fe7eabae90085d0de99e
Author: chaoyang <[email protected]>
AuthorDate: Mon Jun 1 19:32:17 2026 +0800
[python] Implement aggregation merge engine in pypaimon (#7952)
Port Java's `AggregateMergeFunction` to pypaimon, following the shape of
#7745.
Ships the `FieldAggregator` framework, 9 value aggregators (`sum` /
`max` / `min` / `last_value` / `last_non_null_value` / `first_value` /
`first_non_null_value` / `bool_or` / `bool_and`) plus the `primary_key`
placeholder, the `AggregateMergeFunction` wired into
`MergeFileSplitRead._build_merge_function`, and a `merge_engine_support`
guard that rejects retract opt-ins, sequence fields and out-of-scope
aggregator identifiers (`collect` / `nested_update` / `theta_sketch` /
`roaring_bitmap_*` / ...).
Retract handling (DELETE / UPDATE_BEFORE) and the remaining 14 Java
aggregators are intentionally deferred to follow-up PRs, mirroring
#7745's scoping.
---
.../pypaimon/read/merge_engine_support.py | 91 +++++-
.../pypaimon/read/reader/aggregate/__init__.py | 84 +++++
.../pypaimon/read/reader/aggregate/aggregators.py | 283 +++++++++++++++++
.../read/reader/aggregate/field_aggregator.py | 81 +++++
.../read/reader/aggregation_merge_function.py | 194 ++++++++++++
paimon-python/pypaimon/read/split_read.py | 19 ++
.../pypaimon/tests/test_aggregation_e2e.py | 350 +++++++++++++++++++++
.../tests/test_aggregation_merge_function.py | 300 ++++++++++++++++++
.../tests/test_field_aggregator_registry.py | 103 ++++++
.../pypaimon/tests/test_field_aggregators.py | 274 ++++++++++++++++
.../pypaimon/tests/test_partial_update_e2e.py | 23 +-
11 files changed, 1779 insertions(+), 23 deletions(-)
diff --git a/paimon-python/pypaimon/read/merge_engine_support.py
b/paimon-python/pypaimon/read/merge_engine_support.py
index 2dde400465..8ab4813f7e 100644
--- a/paimon-python/pypaimon/read/merge_engine_support.py
+++ b/paimon-python/pypaimon/read/merge_engine_support.py
@@ -45,9 +45,29 @@ _PARTIAL_UPDATE_UNSUPPORTED_BOOLEAN_OPTIONS = (
"partial-update.remove-record-on-delete",
"partial-update.remove-record-on-sequence-group",
)
+# Boolean-valued options that, when truthy, opt the table into the
+# retract / delete-removal behaviour the Python
+# ``AggregateMergeFunction`` does not implement.
+_AGGREGATION_UNSUPPORTED_BOOLEAN_OPTIONS = (
+ "aggregation.remove-record-on-delete",
+)
+# Aggregator identifiers the ``aggregation`` engine knows how to
+# build. Duplicated from the registration site in
+# ``aggregate/aggregators.py`` so this guard has no import-time
+# dependency on the read-pipeline modules; keep both sides in sync
+# when adding new aggregators.
+_AGGREGATION_SUPPORTED_AGG_FUNCS = frozenset([
+ "primary_key",
+ "last_value", "last_non_null_value",
+ "first_value", "first_non_null_value",
+ "sum", "max", "min",
+ "bool_or", "bool_and",
+])
+_SEQUENCE_FIELD_KEY = "sequence.field"
_FIELDS_PREFIX = "fields."
_FIELD_SEQUENCE_GROUP_SUFFIX = ".sequence-group"
_FIELD_AGGREGATE_FUNCTION_SUFFIX = ".aggregate-function"
+_FIELD_IGNORE_RETRACT_SUFFIX = ".ignore-retract"
_DEFAULT_AGGREGATE_FUNCTION_KEY = "fields.default-aggregate-function"
@@ -73,16 +93,35 @@ def check_supported(table) -> None:
"supported subset is per-key last-non-null merge with "
"no sequence-group, no per-field aggregator override, "
"no ignore-delete and no partial-update.remove-record-"
- "on-* flags. Use the Java client for the full feature "
- "set, or open an issue to track Python support.".format(
+ "on-* flags. These options are not yet supported; open "
+ "an issue to track support.".format(
", ".join(sorted(unsupported))
)
)
return
+ if engine == MergeEngine.AGGREGATE:
+ unsupported = aggregation_unsupported_options(table)
+ if unsupported:
+ raise NotImplementedError(
+ "merge-engine 'aggregation' is enabled together with "
+ "options that pypaimon does not yet implement: {}. The "
+ "supported subset is per-key field aggregation with the "
+ "built-in aggregators ({}); retract opt-ins "
+ "(aggregation.remove-record-on-delete, "
+ "fields.<f>.ignore-retract), sequence-field handling "
+ "and other aggregators (product / listagg / collect / "
+ "merge_map* / nested_update* / theta_sketch / "
+ "hll_sketch / roaring_bitmap_*) are not yet supported. "
+ "Open an issue to track support.".format(
+ ", ".join(sorted(unsupported)),
+ ", ".join(sorted(_AGGREGATION_SUPPORTED_AGG_FUNCS)),
+ )
+ )
+ return
raise NotImplementedError(
"merge-engine '{}' is not implemented in pypaimon yet "
- "(supported: deduplicate, first-row, partial-update). Use the Java "
- "client or open an issue to track support.".format(engine.value)
+ "(supported: deduplicate, first-row, partial-update, aggregation). "
+ "Open an issue to track support.".format(engine.value)
)
@@ -106,6 +145,50 @@ def partial_update_unsupported_options(table) -> Set[str]:
return flagged
+def aggregation_unsupported_options(table) -> Set[str]:
+ """Return the set of option keys configured on this table that the
+ ``AggregateMergeFunction`` does not yet support. Empty set means
+ the configuration is safe to run.
+
+ Three families of options are rejected:
+
+ 1. Retract opt-ins: ``aggregation.remove-record-on-delete`` and
+ ``fields.<f>.ignore-retract`` only make sense in conjunction
+ with DELETE / UPDATE_BEFORE handling, which the engine does not
+ implement.
+ 2. Sequence-field configuration: ``sequence.field`` /
+ ``fields.<f>.sequence-group`` are not supported; the merge
+ function does not special-case sequence fields, so we refuse
+ the table rather than silently merge them as ordinary value
+ columns.
+ 3. Out-of-scope aggregator selections: ``fields.<f>.aggregate-
+ function`` and ``fields.default-aggregate-function`` set to an
+ identifier this engine doesn't support yet (e.g. ``collect``,
+ ``nested_update``).
+ """
+ flagged: Set[str] = set()
+ raw = table.options.options.to_map()
+ for key, value in raw.items():
+ if (key in _AGGREGATION_UNSUPPORTED_BOOLEAN_OPTIONS
+ and _option_is_truthy(value)):
+ flagged.add(key)
+ elif key == _SEQUENCE_FIELD_KEY and value:
+ flagged.add(key)
+ elif key == _DEFAULT_AGGREGATE_FUNCTION_KEY:
+ if value not in _AGGREGATION_SUPPORTED_AGG_FUNCS:
+ flagged.add(key)
+ elif key.startswith(_FIELDS_PREFIX):
+ if key.endswith(_FIELD_IGNORE_RETRACT_SUFFIX):
+ if _option_is_truthy(value):
+ flagged.add(key)
+ elif key.endswith(_FIELD_SEQUENCE_GROUP_SUFFIX):
+ flagged.add(key)
+ elif key.endswith(_FIELD_AGGREGATE_FUNCTION_SUFFIX):
+ if value not in _AGGREGATION_SUPPORTED_AGG_FUNCS:
+ flagged.add(key)
+ return flagged
+
+
def _option_is_truthy(raw) -> bool:
if raw is None:
return False
diff --git a/paimon-python/pypaimon/read/reader/aggregate/__init__.py
b/paimon-python/pypaimon/read/reader/aggregate/__init__.py
new file mode 100644
index 0000000000..8d5fe75d48
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/aggregate/__init__.py
@@ -0,0 +1,84 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""FieldAggregator registry and factory entry point.
+
+Looks up the registered factory for an aggregator identifier (``"sum"``,
+``"last_value"``, ...) read from table options and builds an instance
+for it. Concrete aggregators register themselves at import time via
+:func:`register_aggregator`; importing this package eagerly imports the
+built-in aggregator module so the registrations always happen,
+regardless of which call site triggers the first lookup.
+"""
+
+from typing import Callable, Dict, TYPE_CHECKING
+
+from pypaimon.read.reader.aggregate.field_aggregator import FieldAggregator
+from pypaimon.schema.data_types import DataType
+
+if TYPE_CHECKING:
+ from pypaimon.common.options.core_options import CoreOptions
+
+
+# Module-global registry keyed by aggregator identifier
+# (``"sum"``, ``"last_value"`` ...).
+_FACTORIES: Dict[str, Callable[[DataType, str, "CoreOptions"],
FieldAggregator]] = {}
+
+
+def register_aggregator(
+ identifier: str,
+ factory: Callable[[DataType, str, "CoreOptions"], FieldAggregator],
+) -> None:
+ """Register ``factory`` under ``identifier``.
+
+ Re-registering an identifier replaces the existing factory. The
+ built-in aggregators register themselves at module-import time from
+ :mod:`aggregators`.
+ """
+ _FACTORIES[identifier] = factory
+
+
+def create_field_aggregator(
+ field_type: DataType,
+ field_name: str,
+ agg_func_name: str,
+ options: "CoreOptions",
+) -> FieldAggregator:
+ """Build a ``FieldAggregator`` for ``agg_func_name``.
+
+ Raises ``ValueError`` if the identifier was never registered, so
+ typos or out-of-scope aggregators surface at merge-function
+ construction time rather than at the first row.
+ """
+ factory = _FACTORIES.get(agg_func_name)
+ if factory is None:
+ raise ValueError(
+ "Use unsupported aggregation '{}' or spell aggregate function "
+ "incorrectly! Supported aggregators in pypaimon: {}".format(
+ agg_func_name, sorted(_FACTORIES.keys())
+ )
+ )
+ return factory(field_type, field_name, options)
+
+
+# Eager-import the built-in aggregator module so its top-level
+# ``register_aggregator(...)`` calls populate ``_FACTORIES`` before any
+# caller looks anything up. Placed at the bottom of the module so the
+# names ``register_aggregator`` / ``FieldAggregator`` aggregators
+# imports back from here are already defined when its import runs.
+from pypaimon.read.reader.aggregate import aggregators # noqa: E402, F401
diff --git a/paimon-python/pypaimon/read/reader/aggregate/aggregators.py
b/paimon-python/pypaimon/read/reader/aggregate/aggregators.py
new file mode 100644
index 0000000000..961cd10091
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/aggregate/aggregators.py
@@ -0,0 +1,283 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Built-in :class:`FieldAggregator` implementations.
+
+Each class registers itself with the global registry at import time
+via :func:`register_aggregator`, so importing
+``pypaimon.read.reader.aggregate`` makes all of them discoverable.
+
+This module ships 10 aggregators — the primary-key placeholder plus
+the 9 most commonly-used value aggregators: ``primary_key`` /
+``last_value`` / ``last_non_null_value`` / ``first_value`` /
+``first_non_null_value`` / ``sum`` / ``max`` / ``min`` / ``bool_or``
+/ ``bool_and``. Other aggregators (``product`` / ``listagg`` /
+``collect`` / ``merge_map`` / ``nested_update`` / ``theta_sketch`` /
+``hll_sketch`` / ``roaring_bitmap_*``) are intentionally deferred —
+the registry will report them as unsupported so users see a clear
+error rather than a silent fallback.
+"""
+
+from typing import Any
+
+from pypaimon.read.reader.aggregate import register_aggregator
+from pypaimon.read.reader.aggregate.field_aggregator import FieldAggregator
+from pypaimon.schema.data_types import AtomicType, DataType
+
+
+# Aggregator identifiers exposed via ``fields.<name>.aggregate-function``
+# and ``fields.default-aggregate-function``.
+NAME_PRIMARY_KEY = "primary_key"
+NAME_LAST_VALUE = "last_value"
+NAME_LAST_NON_NULL_VALUE = "last_non_null_value"
+NAME_FIRST_VALUE = "first_value"
+NAME_FIRST_NON_NULL_VALUE = "first_non_null_value"
+NAME_SUM = "sum"
+NAME_MAX = "max"
+NAME_MIN = "min"
+NAME_BOOL_OR = "bool_or"
+NAME_BOOL_AND = "bool_and"
+
+
+# Base SQL type names treated as numeric for sum/product-style
+# aggregators. NUMERIC / DEC are SQL synonyms accepted by the parser;
+# treat them the same as DECIMAL.
+_NUMERIC_BASE_TYPES = frozenset([
+ "TINYINT", "SMALLINT", "INT", "INTEGER", "BIGINT",
+ "FLOAT", "DOUBLE", "DECIMAL", "NUMERIC", "DEC",
+])
+
+
+def _atomic_base_name(field_type: DataType):
+ """Extract the bare SQL type name from an :class:`AtomicType`,
+ stripping precision arguments (``DECIMAL(10,2)``) and trailing
+ ``NOT NULL``. Returns ``None`` for non-atomic types so callers can
+ raise a uniform "unsupported type" error.
+ """
+ if not isinstance(field_type, AtomicType):
+ return None
+ raw = field_type.type
+ head = raw.split('(', 1)[0].split(' ', 1)[0]
+ return head.upper()
+
+
+def _check_numeric(name: str, field_type: DataType) -> None:
+ base = _atomic_base_name(field_type)
+ if base not in _NUMERIC_BASE_TYPES:
+ raise ValueError(
+ "Data type for '{}' column must be a numeric type but was "
+ "'{}'.".format(name, field_type)
+ )
+
+
+def _check_boolean(name: str, field_type: DataType) -> None:
+ base = _atomic_base_name(field_type)
+ if base != "BOOLEAN":
+ raise ValueError(
+ "Data type for '{}' column must be 'BOOLEAN' but was "
+ "'{}'.".format(name, field_type)
+ )
+
+
+# ---------------------------------------------------------------------------
+# Aggregator classes
+# ---------------------------------------------------------------------------
+
+
+class FieldPrimaryKeyAgg(FieldAggregator):
+ """Carries the primary-key column through merge unchanged."""
+
+ def agg(self, accumulator: Any, input_field: Any) -> Any:
+ return input_field
+
+
+class FieldLastValueAgg(FieldAggregator):
+ """Latest value wins, including ``None``."""
+
+ def agg(self, accumulator: Any, input_field: Any) -> Any:
+ return input_field
+
+
+class FieldLastNonNullValueAgg(FieldAggregator):
+ """Latest non-null value; ``None`` inputs are absorbed.
+
+ This is the system-wide default aggregator when no per-field
+ override and no ``fields.default-aggregate-function`` are set.
+ """
+
+ def agg(self, accumulator: Any, input_field: Any) -> Any:
+ return accumulator if input_field is None else input_field
+
+
+class FieldFirstValueAgg(FieldAggregator):
+ """First value (including ``None``) wins; locks after the first
+ :meth:`agg` call until the next :meth:`reset`.
+ """
+
+ def __init__(self, name: str, field_type: DataType):
+ super().__init__(name, field_type)
+ self._initialized = False
+
+ def agg(self, accumulator: Any, input_field: Any) -> Any:
+ if not self._initialized:
+ self._initialized = True
+ return input_field
+ return accumulator
+
+ def reset(self) -> None:
+ self._initialized = False
+
+
+class FieldFirstNonNullValueAgg(FieldAggregator):
+ """First non-null value; locks after the first non-null
+ :meth:`agg` call until the next :meth:`reset`.
+ """
+
+ def __init__(self, name: str, field_type: DataType):
+ super().__init__(name, field_type)
+ self._initialized = False
+
+ def agg(self, accumulator: Any, input_field: Any) -> Any:
+ if not self._initialized and input_field is not None:
+ self._initialized = True
+ return input_field
+ return accumulator
+
+ def reset(self) -> None:
+ self._initialized = False
+
+
+class FieldSumAgg(FieldAggregator):
+ """Numeric sum. ``None`` on either side returns the non-null
+ operand. Python's native ``+`` works uniformly for int / float /
+ Decimal — the values produced by the pyarrow read path already
+ arrive as the right Python primitive for the column's SQL type, so
+ no per-type branching is needed.
+ """
+
+ def agg(self, accumulator: Any, input_field: Any) -> Any:
+ if accumulator is None or input_field is None:
+ return accumulator if input_field is None else input_field
+ return accumulator + input_field
+
+
+class FieldMaxAgg(FieldAggregator):
+ """Maximum value. ``None`` on either side returns the non-null
+ operand. Uses Python's native ``<`` so any orderable type
+ (numeric, string, date, datetime, Decimal) works.
+ """
+
+ def agg(self, accumulator: Any, input_field: Any) -> Any:
+ if accumulator is None or input_field is None:
+ return accumulator if input_field is None else input_field
+ return input_field if accumulator < input_field else accumulator
+
+
+class FieldMinAgg(FieldAggregator):
+ """Minimum value. ``None`` on either side returns the non-null
+ operand. Uses Python's native ``<``.
+ """
+
+ def agg(self, accumulator: Any, input_field: Any) -> Any:
+ if accumulator is None or input_field is None:
+ return accumulator if input_field is None else input_field
+ return accumulator if accumulator < input_field else input_field
+
+
+class FieldBoolOrAgg(FieldAggregator):
+ """Logical OR. ``None`` on either side returns the non-null
+ operand.
+ """
+
+ def agg(self, accumulator: Any, input_field: Any) -> Any:
+ if accumulator is None or input_field is None:
+ return accumulator if input_field is None else input_field
+ return bool(accumulator) or bool(input_field)
+
+
+class FieldBoolAndAgg(FieldAggregator):
+ """Logical AND. ``None`` on either side returns the non-null
+ operand.
+ """
+
+ def agg(self, accumulator: Any, input_field: Any) -> Any:
+ if accumulator is None or input_field is None:
+ return accumulator if input_field is None else input_field
+ return bool(accumulator) and bool(input_field)
+
+
+# ---------------------------------------------------------------------------
+# Registration. Each builder binds an identifier to a factory that
+# optionally validates the column DataType before constructing the
+# aggregator instance.
+# ---------------------------------------------------------------------------
+
+
+def _build_no_type_check(cls, identifier: str):
+ """Build a factory that accepts any DataType. Used by
+ ``primary_key`` / ``last_value`` / ``first_value`` variants and by
+ ``max`` / ``min``, all of which work on any orderable DataType.
+ """
+ def _factory(field_type, field_name, options):
+ return cls(identifier, field_type)
+ return _factory
+
+
+def _build_numeric(cls, identifier: str):
+ def _factory(field_type, field_name, options):
+ _check_numeric(identifier, field_type)
+ return cls(identifier, field_type)
+ return _factory
+
+
+def _build_boolean(cls, identifier: str):
+ def _factory(field_type, field_name, options):
+ _check_boolean(identifier, field_type)
+ return cls(identifier, field_type)
+ return _factory
+
+
+register_aggregator(
+ NAME_PRIMARY_KEY,
+ _build_no_type_check(FieldPrimaryKeyAgg, NAME_PRIMARY_KEY),
+)
+register_aggregator(
+ NAME_LAST_VALUE,
+ _build_no_type_check(FieldLastValueAgg, NAME_LAST_VALUE),
+)
+register_aggregator(
+ NAME_LAST_NON_NULL_VALUE,
+ _build_no_type_check(FieldLastNonNullValueAgg, NAME_LAST_NON_NULL_VALUE),
+)
+register_aggregator(
+ NAME_FIRST_VALUE,
+ _build_no_type_check(FieldFirstValueAgg, NAME_FIRST_VALUE),
+)
+register_aggregator(
+ NAME_FIRST_NON_NULL_VALUE,
+ _build_no_type_check(FieldFirstNonNullValueAgg, NAME_FIRST_NON_NULL_VALUE),
+)
+register_aggregator(NAME_SUM, _build_numeric(FieldSumAgg, NAME_SUM))
+register_aggregator(NAME_MAX, _build_no_type_check(FieldMaxAgg, NAME_MAX))
+register_aggregator(NAME_MIN, _build_no_type_check(FieldMinAgg, NAME_MIN))
+register_aggregator(
+ NAME_BOOL_OR, _build_boolean(FieldBoolOrAgg, NAME_BOOL_OR)
+)
+register_aggregator(
+ NAME_BOOL_AND, _build_boolean(FieldBoolAndAgg, NAME_BOOL_AND)
+)
diff --git a/paimon-python/pypaimon/read/reader/aggregate/field_aggregator.py
b/paimon-python/pypaimon/read/reader/aggregate/field_aggregator.py
new file mode 100644
index 0000000000..b306a4c3f7
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/aggregate/field_aggregator.py
@@ -0,0 +1,81 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Per-field aggregator abstraction used by the ``aggregation`` merge
+engine.
+
+Each non-PK field is reduced across rows sharing the same primary key
+by one ``FieldAggregator`` instance picked per field. The
+``AggregateMergeFunction`` drives the lifecycle: ``reset()`` at the
+start of each key group, ``agg()`` per input value, and the final
+accumulator is read out to build the merged row.
+"""
+
+from abc import ABC, abstractmethod
+from typing import Any
+
+from pypaimon.schema.data_types import DataType
+
+
+class FieldAggregator(ABC):
+ """Per-field aggregator base class.
+
+ Concrete subclasses implement :meth:`agg` and may override
+ :meth:`reset`. :meth:`retract` is intentionally left as the default
+ "refuse" implementation: pypaimon's ``AggregateMergeFunction``
+ rejects ``DELETE`` / ``UPDATE_BEFORE`` rows up-front, so no
+ aggregator's ``retract`` is reachable from the read path. The hook
+ is kept so a future PR can add retract semantics without changing
+ every subclass.
+ """
+
+ def __init__(self, name: str, field_type: DataType):
+ self.name = name
+ self.field_type = field_type
+
+ @abstractmethod
+ def agg(self, accumulator: Any, input_field: Any) -> Any:
+ """Combine ``accumulator`` with ``input_field`` and return the
+ new accumulator. Called once per row in the key group, in
+ arrival order (sequence-number ascending). ``accumulator`` is
+ ``None`` before the first add.
+ """
+
+ def reset(self) -> None:
+ """Reset internal state at the start of a new key group.
+
+ Default is a no-op. Aggregators that carry per-group bookkeeping
+ beyond the externally-passed accumulator (e.g. ``first_value``'s
+ "have we seen any row yet?" flag) must override this.
+ """
+
+ def retract(self, accumulator: Any, retract_field: Any) -> Any:
+ """Refuse the retract operation by default.
+
+ ``AggregateMergeFunction`` rejects retract rows at :meth:`add`
+ time, so this path is currently unreachable from the read
+ pipeline. The hook is kept for forward-compatibility: a future
+ PR that wires retract through the merge function can override
+ this on the aggregators that actually support it (sum, product,
+ last_value, ...).
+ """
+ raise NotImplementedError(
+ "Aggregator '{}' does not support retract; the aggregation "
+ "merge engine does not implement DELETE / UPDATE_BEFORE "
+ "handling.".format(self.name)
+ )
diff --git a/paimon-python/pypaimon/read/reader/aggregation_merge_function.py
b/paimon-python/pypaimon/read/reader/aggregation_merge_function.py
new file mode 100644
index 0000000000..5e2c149738
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/aggregation_merge_function.py
@@ -0,0 +1,194 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Merge function for the ``aggregation`` merge engine.
+
+Rows sharing a primary key are folded across each non-PK field by the
+per-field :class:`FieldAggregator` configured in table options.
+``DeduplicateMergeFunction`` keeps only the latest row;
+``PartialUpdateMergeFunction`` lets later writes "fill in" fields the
+earlier writes left null; ``AggregateMergeFunction`` runs an actual
+aggregation (sum / max / min / last_value / ...) per column.
+
+This is the **core merge semantics only**. Retract on DELETE /
+UPDATE_BEFORE rows (with ``aggregation.remove-record-on-delete`` and
+``fields.<field>.ignore-retract`` opt-ins) and ~14 additional
+aggregators (``product`` / ``listagg`` / ``collect`` / ``merge_map`` /
+``nested_update`` / ``theta_sketch`` / ``hll_sketch`` /
+``roaring_bitmap_*``) are intentionally deferred. Non-INSERT row
+kinds raise ``NotImplementedError`` at :meth:`add` time so we never
+silently corrupt data with a half-implemented contract, and
+out-of-scope aggregator identifiers / options are rejected up-front in
+:mod:`pypaimon.read.merge_engine_support`.
+"""
+
+from typing import Any, List, Optional
+
+from pypaimon.read.reader.aggregate import create_field_aggregator
+from pypaimon.read.reader.aggregate.aggregators import (
+ NAME_LAST_NON_NULL_VALUE,
+ NAME_PRIMARY_KEY,
+)
+from pypaimon.read.reader.aggregate.field_aggregator import FieldAggregator
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.row.key_value import KeyValue
+from pypaimon.table.row.row_kind import RowKind
+
+
+# ---------------------------------------------------------------------------
+# Aggregator-list construction helpers. Live in this module (rather than
+# in split_read.py) so they can be exercised directly by unit tests.
+# ---------------------------------------------------------------------------
+
+
+def resolve_agg_func_name(field_name, primary_keys, options_map):
+ """Pick the aggregator identifier for ``field_name`` using the
+ following precedence:
+
+ 1. Primary-key columns use ``primary_key`` (identity).
+ 2. Otherwise, field-level ``fields.<f>.aggregate-function``
+ overrides everything.
+ 3. Otherwise, the table-wide ``fields.default-aggregate-function``.
+ 4. Otherwise, the system default ``last_non_null_value``.
+
+ Sequence fields are intentionally **not** special-cased here: the
+ merge-engine guard in :mod:`pypaimon.read.merge_engine_support`
+ rejects any table that sets ``sequence.field`` on the
+ ``aggregation`` engine, so by the time this function runs there is
+ no sequence field to disambiguate.
+ """
+ if field_name in primary_keys:
+ return NAME_PRIMARY_KEY
+ return (
+ options_map.get("fields.{}.aggregate-function".format(field_name))
+ or options_map.get("fields.default-aggregate-function")
+ or NAME_LAST_NON_NULL_VALUE
+ )
+
+
+def build_field_aggregators(
+ value_fields: List[DataField],
+ primary_keys: List[str],
+ core_options,
+) -> List[FieldAggregator]:
+ """Build the per-column aggregator list parallel to ``value_fields``.
+
+ Resolves the identifier for each field via :func:`resolve_agg_func_name`
+ and instantiates the aggregator through the registry. Type validation
+ for aggregators that care (``sum`` requires numeric, ``bool_or`` /
+ ``bool_and`` require boolean) runs inside the registered factory, so
+ misconfigured tables fail here rather than at first row.
+ """
+ options_map = core_options.options.to_map()
+ pk_set = set(primary_keys)
+ aggregators = []
+ for field in value_fields:
+ agg_name = resolve_agg_func_name(field.name, pk_set, options_map)
+ aggregators.append(
+ create_field_aggregator(
+ field.type, field.name, agg_name, core_options
+ )
+ )
+ return aggregators
+
+
+# ---------------------------------------------------------------------------
+# Merge function
+# ---------------------------------------------------------------------------
+
+
+class AggregateMergeFunction:
+ """A MergeFunction where the key is the primary key (unique) and
+ each non-PK column is reduced across the rows for that key by its
+ configured :class:`FieldAggregator`.
+
+ Follows the same ``MergeFunction`` protocol used by
+ :class:`SortMergeReaderWithMinHeap`: :meth:`reset` between groups
+ of same-key rows, :meth:`add` one row at a time (oldest to
+ newest), :meth:`get_result` after the group is exhausted.
+ """
+
+ def __init__(self,
+ key_arity: int,
+ value_arity: int,
+ field_aggregators: List[FieldAggregator]):
+ if len(field_aggregators) != value_arity:
+ raise ValueError(
+ "field_aggregators length {} does not match value_arity "
+ "{}".format(len(field_aggregators), value_arity)
+ )
+ self._key_arity = key_arity
+ self._value_arity = value_arity
+ self._field_aggregators = field_aggregators
+ # Parallel to value indices. Reset at the start of every key
+ # group; updated in-place as ``add()`` calls feed rows in.
+ self._accumulators: List[Any] = [None] * value_arity
+ # Reference to the most recently added kv. Used only to
+ # propagate the key + sequence_number into the result row; we
+ # snapshot those values into a fresh tuple in ``get_result()``
+ # so the result is not aliased to upstream's reused KeyValue.
+ self._latest_kv: Optional[KeyValue] = None
+
+ def reset(self) -> None:
+ self._accumulators = [None] * self._value_arity
+ for agg in self._field_aggregators:
+ agg.reset()
+ self._latest_kv = None
+
+ def add(self, kv: KeyValue) -> None:
+ row_kind_byte = kv.value_row_kind_byte
+ if not RowKind.is_add_byte(row_kind_byte):
+ # DELETE / UPDATE_BEFORE rows are not supported by this
+ # merge engine. Refuse them rather than silently swallow
+ # rows, which would let aggregations diverge from the
+ # underlying data.
+ raise NotImplementedError(
+ "AggregateMergeFunction received a {} row; the "
+ "aggregation merge engine does not yet implement "
+ "retract (DELETE / UPDATE_BEFORE) handling. Tables "
+ "producing such rows are not yet supported."
+ .format(RowKind(row_kind_byte).to_string())
+ )
+
+ for i, agg in enumerate(self._field_aggregators):
+ input_val = kv.value.get_field(i)
+ self._accumulators[i] = agg.agg(self._accumulators[i], input_val)
+ self._latest_kv = kv
+
+ def get_result(self) -> Optional[KeyValue]:
+ if self._latest_kv is None:
+ return None
+
+ kv = self._latest_kv
+ # Snapshot the key as a fresh tuple — we cannot keep a reference
+ # to ``kv`` because upstream readers (e.g. KeyValueWrapReader)
+ # reuse a single KeyValue instance and mutate its underlying
+ # row_tuple between calls. Building a fresh tuple here means
+ # the result we return is decoupled from any subsequent
+ # iteration.
+ key_values = tuple(
+ kv.key.get_field(i) for i in range(self._key_arity)
+ )
+ result_row = key_values + (
+ kv.sequence_number,
+ RowKind.INSERT.value,
+ ) + tuple(self._accumulators)
+
+ result = KeyValue(self._key_arity, self._value_arity)
+ result.replace(result_row)
+ return result
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 6256bdd859..9b46a377c6 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -54,6 +54,8 @@ from pypaimon.read.reader.key_value_unwrap_reader import \
KeyValueUnwrapRecordReader
from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader
from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
+from pypaimon.read.reader.aggregation_merge_function import (
+ AggregateMergeFunction, build_field_aggregators)
from pypaimon.read.reader.partial_update_merge_function import \
PartialUpdateMergeFunction
from pypaimon.read.reader.first_row_merge_function import \
@@ -695,6 +697,23 @@ class MergeFileSplitRead(SplitRead):
return FirstRowMergeFunction(
ignore_delete=self.table.options.ignore_delete(),
)
+ if engine == MergeEngine.AGGREGATE:
+ # Use the full primary-key list, not ``trimmed_primary_key``:
+ # ``value_fields`` still carries partition columns, so any PK
+ # column that is also a partition column must be recognised
+ # as PK here. Otherwise a table with
+ # ``fields.default-aggregate-function`` would apply the
+ # default aggregator to that partition-PK column.
+ field_aggregators = build_field_aggregators(
+ self.value_fields,
+ self.table.primary_keys,
+ self.table.options,
+ )
+ return AggregateMergeFunction(
+ key_arity=len(self.trimmed_primary_key),
+ value_arity=self.value_arity,
+ field_aggregators=field_aggregators,
+ )
# check_supported() rejects everything else at TableRead.__init__.
raise AssertionError(
"unreachable: merge-engine '{}' should have been rejected by "
diff --git a/paimon-python/pypaimon/tests/test_aggregation_e2e.py
b/paimon-python/pypaimon/tests/test_aggregation_e2e.py
new file mode 100644
index 0000000000..8eed935371
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_aggregation_e2e.py
@@ -0,0 +1,350 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""End-to-end tests for the ``aggregation`` merge engine.
+
+Each test creates a PK table with ``merge-engine=aggregation`` plus
+per-field aggregator configuration, writes two or more commits against
+the same PK, and reads back. The aggregation engine must reduce each
+non-PK column independently using the configured aggregator (sum / max
+/ last_value / ...). Disjoint PKs must remain unmerged. Default
+behaviour when no aggregator is configured is ``last_non_null_value``.
+
+The second half of the file exercises the merge-engine-support guard:
+tables that configure aggregation with options pypaimon does not yet
+implement (retract opt-ins, sequence fields, out-of-scope aggregator
+identifiers) must raise ``NotImplementedError`` at TableRead
+construction rather than silently fall back to a wrong answer.
+"""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class AggregationMergeEngineE2ETest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', True)
+
+ cls.pa_schema = pa.schema([
+ pa.field('id', pa.int64(), nullable=False),
+ ('total', pa.int64()),
+ ('max_score', pa.int64()),
+ ('label', pa.string()),
+ ])
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def _create_pk_table(self, table_name, field_aggs=None,
+ default_agg=None, extra_options=None):
+ # bucket=1 forces all rows for a given PK to land in the same
+ # bucket, which routes reads through SortMergeReader where the
+ # aggregation merge function lives. Without it, fresh
+ # single-snapshot tables take the raw_convertible fast path and
+ # bypass the merge function entirely.
+ options = {
+ 'bucket': '1',
+ 'merge-engine': 'aggregation',
+ }
+ if field_aggs:
+ for field_name, agg_func in field_aggs.items():
+ options['fields.{}.aggregate-function'.format(field_name)] =
agg_func
+ if default_agg:
+ options['fields.default-aggregate-function'] = default_agg
+ if extra_options:
+ options.update(extra_options)
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['id'],
+ options=options,
+ )
+ full = 'default.{}'.format(table_name)
+ self.catalog.create_table(full, schema, False)
+ return self.catalog.get_table(full)
+
+ def _write(self, table, rows):
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ c = wb.new_commit()
+ try:
+ w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema))
+ c.commit(w.prepare_commit())
+ finally:
+ w.close()
+ c.close()
+
+ def _read(self, table):
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ if not splits:
+ return []
+ return sorted(
+ rb.new_read().to_arrow(splits).to_pylist(),
+ key=lambda r: r['id'],
+ )
+
+ # -- aggregation happy path -----------------------------------------
+
+ def test_sum_aggregator_across_commits(self):
+ table = self._create_pk_table(
+ 'agg_sum',
+ field_aggs={'total': 'sum'},
+ )
+ self._write(table, [{'id': 1, 'total': 10, 'max_score': 5, 'label':
'a'}])
+ self._write(table, [{'id': 1, 'total': 20, 'max_score': 3, 'label':
'b'}])
+ self._write(table, [{'id': 1, 'total': 30, 'max_score': 8, 'label':
'c'}])
+
+ rows = self._read(table)
+ self.assertEqual(len(rows), 1)
+ row = rows[0]
+ self.assertEqual(row['id'], 1)
+ # total: 10 + 20 + 30 = 60
+ self.assertEqual(row['total'], 60)
+ # max_score and label have no aggregator configured → default
+ # last_non_null_value: latest non-null wins.
+ self.assertEqual(row['max_score'], 8)
+ self.assertEqual(row['label'], 'c')
+
+ def test_multiple_aggregators_compose(self):
+ table = self._create_pk_table(
+ 'agg_multi',
+ field_aggs={
+ 'total': 'sum',
+ 'max_score': 'max',
+ 'label': 'last_value',
+ },
+ )
+ self._write(table, [{'id': 1, 'total': 10, 'max_score': 5, 'label':
'a'}])
+ self._write(table, [{'id': 1, 'total': 7, 'max_score': 12, 'label':
'b'}])
+ self._write(table, [{'id': 1, 'total': 3, 'max_score': 1, 'label':
'c'}])
+
+ row = self._read(table)[0]
+ self.assertEqual(row['total'], 20) # sum: 10+7+3
+ self.assertEqual(row['max_score'], 12) # max: max(5,12,1)
+ self.assertEqual(row['label'], 'c') # last_value
+
+ def test_null_inputs_follow_aggregator_semantics(self):
+ table = self._create_pk_table(
+ 'agg_nulls',
+ field_aggs={
+ 'total': 'sum',
+ 'max_score': 'last_value',
+ },
+ )
+ self._write(table, [{'id': 1, 'total': 5, 'max_score': 7, 'label':
'x'}])
+ # null total is absorbed by sum; null max_score replaces under
+ # last_value (last_value keeps the last input verbatim,
+ # including None).
+ self._write(table, [{'id': 1, 'total': None, 'max_score': None,
'label': None}])
+ self._write(table, [{'id': 1, 'total': 4, 'max_score': 9, 'label':
'y'}])
+
+ row = self._read(table)[0]
+ self.assertEqual(row['total'], 9) # 5 + 4 (None absorbed)
+ self.assertEqual(row['max_score'], 9) # last_value's last input
+ # label: default last_non_null_value, intermediate None ignored,
+ # the final 'y' wins.
+ self.assertEqual(row['label'], 'y')
+
+ def test_disjoint_keys_remain_unmerged(self):
+ table = self._create_pk_table(
+ 'agg_disjoint',
+ field_aggs={'total': 'sum'},
+ )
+ self._write(table, [
+ {'id': 1, 'total': 10, 'max_score': 1, 'label': 'a'},
+ {'id': 2, 'total': 20, 'max_score': 2, 'label': 'b'},
+ {'id': 3, 'total': 30, 'max_score': 3, 'label': 'c'},
+ ])
+ # Second commit only touches id=2.
+ self._write(table, [{'id': 2, 'total': 5, 'max_score': 7, 'label':
'B'}])
+
+ rows = self._read(table)
+ self.assertEqual(rows, [
+ {'id': 1, 'total': 10, 'max_score': 1, 'label': 'a'},
+ {'id': 2, 'total': 25, 'max_score': 7, 'label': 'B'},
+ {'id': 3, 'total': 30, 'max_score': 3, 'label': 'c'},
+ ])
+
+ def test_default_aggregator_applies_to_unconfigured_fields(self):
+ table = self._create_pk_table(
+ 'agg_default',
+ default_agg='max',
+ )
+ self._write(table, [{'id': 1, 'total': 3, 'max_score': 5, 'label':
'm'}])
+ self._write(table, [{'id': 1, 'total': 7, 'max_score': 2, 'label':
'a'}])
+ self._write(table, [{'id': 1, 'total': 1, 'max_score': 9, 'label':
'z'}])
+
+ row = self._read(table)[0]
+ # All non-PK fields fall through to
fields.default-aggregate-function=max.
+ self.assertEqual(row['total'], 7)
+ self.assertEqual(row['max_score'], 9)
+ self.assertEqual(row['label'], 'z') # 'z' > 'm' > 'a'
lexicographically
+
+ def test_default_behavior_is_last_non_null_value(self):
+ # No field-level or default aggregator configured → every non-PK
+ # field uses the system default last_non_null_value.
+ table = self._create_pk_table('agg_implicit_default')
+ self._write(table, [{'id': 1, 'total': 5, 'max_score': 9, 'label':
'a'}])
+ self._write(table, [{'id': 1, 'total': None, 'max_score': 3, 'label':
None}])
+ self._write(table, [{'id': 1, 'total': 7, 'max_score': None, 'label':
'b'}])
+
+ row = self._read(table)[0]
+ self.assertEqual(row['total'], 7) # latest non-null
+ self.assertEqual(row['max_score'], 3) # latest non-null
+ self.assertEqual(row['label'], 'b') # latest non-null
+
+ # -- unsupported-option guards --------------------------------------
+ #
+ # Tables that opt into behaviour AggregateMergeFunction doesn't
+ # implement must surface a NotImplementedError at TableRead
+ # construction, not silently produce wrong results.
+
+ def _create_and_expect_unsupported(self, table_name, extra_options,
+ expected_substring):
+ table = self._create_pk_table(
+ table_name, extra_options=extra_options
+ )
+ # Writing is fine — the guard fires when a reader is built.
+ self._write(table, [{'id': 1, 'total': 1, 'max_score': 1, 'label':
'a'}])
+ rb = table.new_read_builder()
+ with self.assertRaises(NotImplementedError) as cm:
+ rb.new_read()
+ msg = str(cm.exception)
+ self.assertIn('aggregation', msg)
+ self.assertIn(expected_substring, msg)
+
+ def test_remove_record_on_delete_rejected(self):
+ self._create_and_expect_unsupported(
+ 'agg_reject_remove_on_delete',
+ {'aggregation.remove-record-on-delete': 'true'},
+ 'aggregation.remove-record-on-delete',
+ )
+
+ def test_field_ignore_retract_rejected(self):
+ self._create_and_expect_unsupported(
+ 'agg_reject_ignore_retract',
+ {'fields.total.ignore-retract': 'true'},
+ 'fields.total.ignore-retract',
+ )
+
+ def test_sequence_field_rejected(self):
+ self._create_and_expect_unsupported(
+ 'agg_reject_sequence_field',
+ {'sequence.field': 'total'},
+ 'sequence.field',
+ )
+
+ def test_field_sequence_group_rejected(self):
+ self._create_and_expect_unsupported(
+ 'agg_reject_sequence_group',
+ {'fields.max_score.sequence-group': 'label'},
+ 'fields.max_score.sequence-group',
+ )
+
+ def test_out_of_scope_field_aggregator_rejected(self):
+ # collect is one of the aggregator identifiers this engine
+ # doesn't support yet. The guard must reject the config rather
+ # than let the per-field factory build a (silently wrong)
+ # fallback.
+ self._create_and_expect_unsupported(
+ 'agg_reject_collect',
+ {'fields.label.aggregate-function': 'collect'},
+ 'fields.label.aggregate-function',
+ )
+
+ def test_out_of_scope_default_aggregator_rejected(self):
+ self._create_and_expect_unsupported(
+ 'agg_reject_default_collect',
+ {'fields.default-aggregate-function': 'product'},
+ 'fields.default-aggregate-function',
+ )
+
+ def test_supported_field_aggregator_passes_guard(self):
+ # Sanity check: setting one of the supported aggregators does
+ # NOT trip the guard introduced for out-of-scope identifiers.
+ table = self._create_pk_table(
+ 'agg_supported_passes',
+ field_aggs={'total': 'sum'},
+ )
+ self._write(table, [{'id': 1, 'total': 1, 'max_score': 1, 'label':
'a'}])
+ # If the guard wrongly flagged 'sum', new_read() would raise.
+ # Touch it explicitly so the test fails loudly otherwise.
+ table.new_read_builder().new_read()
+
+ # -- partition column that is also part of the primary key ----------
+
+ def test_partition_pk_overlap_not_aggregated_by_default(self):
+ # When a partition column is also part of the primary key and a
+ # table-wide ``fields.default-aggregate-function`` is configured,
+ # the partition-PK column must be treated as PK (identity) and
+ # not run through the default aggregator. Regression for the
+ # split_read bug where the trimmed PK list (which drops
+ # partition columns) was passed to ``build_field_aggregators``.
+ pa_schema = pa.schema([
+ pa.field('p', pa.int64(), nullable=False),
+ pa.field('id', pa.int64(), nullable=False),
+ pa.field('v', pa.int64()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ primary_keys=['p', 'id'],
+ partition_keys=['p'],
+ options={
+ 'bucket': '1',
+ 'merge-engine': 'aggregation',
+ 'fields.default-aggregate-function': 'sum',
+ },
+ )
+ self.catalog.create_table(
+ 'default.agg_partition_pk_overlap', schema, False)
+ table = self.catalog.get_table('default.agg_partition_pk_overlap')
+
+ def write(rows):
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ c = wb.new_commit()
+ try:
+ w.write_arrow(pa.Table.from_pylist(rows, schema=pa_schema))
+ c.commit(w.prepare_commit())
+ finally:
+ w.close()
+ c.close()
+
+ write([{'p': 1, 'id': 1, 'v': 10}])
+ write([{'p': 1, 'id': 1, 'v': 20}])
+
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ rows = rb.new_read().to_arrow(splits).to_pylist()
+ self.assertEqual(rows, [{'p': 1, 'id': 1, 'v': 30}])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_aggregation_merge_function.py
b/paimon-python/pypaimon/tests/test_aggregation_merge_function.py
new file mode 100644
index 0000000000..7ff00fbb9b
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_aggregation_merge_function.py
@@ -0,0 +1,300 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Direct unit tests for :class:`AggregateMergeFunction` and its
+helper functions.
+
+Drives the merge function with synthetic :class:`KeyValue` instances
+so the contract is pinned down without going through the full read
+pipeline. The end-to-end behaviour on real PK tables is exercised
+separately in ``test_aggregation_e2e.py``.
+"""
+
+import unittest
+
+from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.common.options.options import Options
+from pypaimon.read.reader.aggregate import create_field_aggregator
+from pypaimon.read.reader.aggregation_merge_function import (
+ AggregateMergeFunction,
+ build_field_aggregators,
+ resolve_agg_func_name,
+)
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.table.row.key_value import KeyValue
+from pypaimon.table.row.row_kind import RowKind
+
+
+def _kv(key, seq, row_kind, value):
+ """Build a fresh KeyValue for a (key, sequence, row_kind, value)
+ tuple. Same shape as the helper in
+ ``test_partial_update_merge_function.py`` so both test files read
+ consistently.
+ """
+ kv = KeyValue(key_arity=len(key), value_arity=len(value))
+ kv.replace(tuple(key) + (seq, row_kind.value) + tuple(value))
+ return kv
+
+
+def _result_value(kv):
+ return tuple(kv.value.get_field(i) for i in range(kv.value_arity))
+
+
+def _result_key(kv):
+ return tuple(kv.key.get_field(i) for i in range(kv.key_arity))
+
+
+def _make_agg(identifier, sql_type, field_name="f"):
+ return create_field_aggregator(
+ AtomicType(sql_type), field_name, identifier, options=None
+ )
+
+
+class AggregateMergeFunctionTest(unittest.TestCase):
+
+ def test_single_insert_returns_aggregated_value(self):
+ mf = AggregateMergeFunction(
+ key_arity=1, value_arity=1,
+ field_aggregators=[_make_agg("sum", "BIGINT")],
+ )
+ mf.reset()
+ mf.add(_kv((1,), 100, RowKind.INSERT, (10,)))
+ result = mf.get_result()
+
+ self.assertIsNotNone(result)
+ self.assertEqual(_result_key(result), (1,))
+ self.assertEqual(_result_value(result), (10,))
+ self.assertEqual(result.sequence_number, 100)
+ self.assertEqual(result.value_row_kind_byte, RowKind.INSERT.value)
+
+ def test_multi_row_aggregation_across_fields(self):
+ # field 0: sum, field 1: max, field 2: last_value
+ mf = AggregateMergeFunction(
+ key_arity=1, value_arity=3,
+ field_aggregators=[
+ _make_agg("sum", "BIGINT", "v_sum"),
+ _make_agg("max", "INT", "v_max"),
+ _make_agg("last_value", "VARCHAR", "v_last"),
+ ],
+ )
+ mf.reset()
+ mf.add(_kv((1,), 100, RowKind.INSERT, (10, 5, "a")))
+ mf.add(_kv((1,), 101, RowKind.INSERT, (20, 3, "b")))
+ mf.add(_kv((1,), 102, RowKind.INSERT, (30, 9, "c")))
+
+ result = mf.get_result()
+ self.assertEqual(_result_value(result), (60, 9, "c"))
+ # latest sequence is propagated through.
+ self.assertEqual(result.sequence_number, 102)
+
+ def test_null_inputs_follow_aggregator_semantics(self):
+ # sum drops nulls; last_non_null_value drops nulls; last_value keeps
them.
+ mf = AggregateMergeFunction(
+ key_arity=1, value_arity=3,
+ field_aggregators=[
+ _make_agg("sum", "BIGINT", "v_sum"),
+ _make_agg("last_non_null_value", "VARCHAR", "v_lnn"),
+ _make_agg("last_value", "VARCHAR", "v_last"),
+ ],
+ )
+ mf.reset()
+ mf.add(_kv((1,), 100, RowKind.INSERT, (10, "x", "x")))
+ mf.add(_kv((1,), 101, RowKind.INSERT, (None, None, None)))
+ mf.add(_kv((1,), 102, RowKind.INSERT, (5, None, "z")))
+
+ result = mf.get_result()
+ # sum: 10 + 5 = 15 (nulls absorbed)
+ # last_non_null: 'x' (intermediate nulls preserved earlier value)
+ # last_value: 'z' (the very last value, including the prior None)
+ self.assertEqual(_result_value(result), (15, "x", "z"))
+
+ def test_update_after_treated_as_insert(self):
+ mf = AggregateMergeFunction(
+ key_arity=1, value_arity=1,
+ field_aggregators=[_make_agg("sum", "BIGINT")],
+ )
+ mf.reset()
+ mf.add(_kv((1,), 100, RowKind.UPDATE_AFTER, (7,)))
+ result = mf.get_result()
+ self.assertEqual(_result_value(result), (7,))
+
+ def test_delete_row_raises_not_implemented(self):
+ mf = AggregateMergeFunction(
+ key_arity=1, value_arity=1,
+ field_aggregators=[_make_agg("sum", "BIGINT")],
+ )
+ mf.reset()
+ with self.assertRaises(NotImplementedError) as ctx:
+ mf.add(_kv((1,), 100, RowKind.DELETE, (5,)))
+ self.assertIn("retract", str(ctx.exception))
+ self.assertIn("-D", str(ctx.exception))
+
+ def test_update_before_row_raises_not_implemented(self):
+ mf = AggregateMergeFunction(
+ key_arity=1, value_arity=1,
+ field_aggregators=[_make_agg("sum", "BIGINT")],
+ )
+ mf.reset()
+ with self.assertRaises(NotImplementedError) as ctx:
+ mf.add(_kv((1,), 100, RowKind.UPDATE_BEFORE, (5,)))
+ self.assertIn("-U", str(ctx.exception))
+
+ def test_reset_between_keys_clears_state(self):
+ mf = AggregateMergeFunction(
+ key_arity=1, value_arity=2,
+ field_aggregators=[
+ _make_agg("sum", "BIGINT"),
+ _make_agg("first_value", "VARCHAR"),
+ ],
+ )
+ # Key group 1.
+ mf.reset()
+ mf.add(_kv((1,), 100, RowKind.INSERT, (5, "a")))
+ mf.add(_kv((1,), 101, RowKind.INSERT, (3, "b")))
+ r1 = mf.get_result()
+ self.assertEqual(_result_value(r1), (8, "a"))
+ # Key group 2 — sum must restart from 0 and first_value must
+ # re-arm so the new group's first row wins.
+ mf.reset()
+ mf.add(_kv((2,), 200, RowKind.INSERT, (10, "x")))
+ mf.add(_kv((2,), 201, RowKind.INSERT, (20, "y")))
+ r2 = mf.get_result()
+ self.assertEqual(_result_key(r2), (2,))
+ self.assertEqual(_result_value(r2), (30, "x"))
+
+ def test_get_result_before_any_add_returns_none(self):
+ mf = AggregateMergeFunction(
+ key_arity=1, value_arity=1,
+ field_aggregators=[_make_agg("sum", "BIGINT")],
+ )
+ mf.reset()
+ self.assertIsNone(mf.get_result())
+
+ def test_result_is_decoupled_from_input_kv(self):
+ """Critical: upstream KeyValueWrapReader reuses one KeyValue and
+ rebinds its row_tuple between iterations. The merge function
+ must snapshot its output so the previously-returned result is
+ not silently mutated when the source advances.
+ """
+ mf = AggregateMergeFunction(
+ key_arity=1, value_arity=1,
+ field_aggregators=[_make_agg("sum", "BIGINT")],
+ )
+ mf.reset()
+ # Build one reusable kv and rebind it twice, like upstream does.
+ kv = KeyValue(key_arity=1, value_arity=1)
+ kv.replace((1, 100, RowKind.INSERT.value, 5))
+ mf.add(kv)
+ kv.replace((1, 101, RowKind.INSERT.value, 7))
+ mf.add(kv)
+ result = mf.get_result()
+
+ # Now mutate the source kv. The previously-captured result must
+ # NOT change.
+ kv.replace((999, 999, RowKind.INSERT.value, 99999))
+
+ self.assertEqual(_result_key(result), (1,))
+ self.assertEqual(_result_value(result), (12,)) # 5 + 7
+ self.assertEqual(result.sequence_number, 101)
+
+ def test_value_arity_mismatch_at_construction_raises(self):
+ with self.assertRaises(ValueError):
+ AggregateMergeFunction(
+ key_arity=1, value_arity=2,
+ field_aggregators=[_make_agg("sum", "BIGINT")],
+ )
+
+
+class ResolveAggFuncNameTest(unittest.TestCase):
+
+ def test_primary_key_takes_precedence(self):
+ name = resolve_agg_func_name(
+ "id", primary_keys={"id"}, options_map={
+ "fields.id.aggregate-function": "sum",
+ }
+ )
+ self.assertEqual(name, "primary_key")
+
+ def test_field_level_override_wins_over_default(self):
+ name = resolve_agg_func_name(
+ "v", primary_keys=set(), options_map={
+ "fields.v.aggregate-function": "max",
+ "fields.default-aggregate-function": "sum",
+ }
+ )
+ self.assertEqual(name, "max")
+
+ def test_table_default_used_when_no_field_override(self):
+ name = resolve_agg_func_name(
+ "v", primary_keys=set(), options_map={
+ "fields.default-aggregate-function": "sum",
+ }
+ )
+ self.assertEqual(name, "sum")
+
+ def test_system_default_when_nothing_configured(self):
+ name = resolve_agg_func_name(
+ "v", primary_keys=set(), options_map={}
+ )
+ self.assertEqual(name, "last_non_null_value")
+
+
+class BuildFieldAggregatorsTest(unittest.TestCase):
+
+ def _make_options(self, raw):
+ return CoreOptions(Options(raw))
+
+ def test_builds_aggregators_aligned_with_value_fields(self):
+ fields = [
+ DataField(0, "id", AtomicType("BIGINT")),
+ DataField(1, "amount", AtomicType("BIGINT")),
+ DataField(2, "name", AtomicType("VARCHAR")),
+ ]
+ options = self._make_options({
+ "fields.amount.aggregate-function": "sum",
+ })
+ aggs = build_field_aggregators(
+ value_fields=fields,
+ primary_keys=["id"],
+ core_options=options,
+ )
+ self.assertEqual(len(aggs), 3)
+ self.assertEqual(aggs[0].name, "primary_key")
+ self.assertEqual(aggs[1].name, "sum")
+ # Falls through to the system default since no override and no
+ # fields.default-aggregate-function is set.
+ self.assertEqual(aggs[2].name, "last_non_null_value")
+
+ def test_unknown_aggregator_identifier_raises(self):
+ fields = [
+ DataField(0, "id", AtomicType("BIGINT")),
+ DataField(1, "v", AtomicType("BIGINT")),
+ ]
+ options = self._make_options({
+ "fields.v.aggregate-function": "no_such_aggregator",
+ })
+ with self.assertRaises(ValueError):
+ build_field_aggregators(
+ value_fields=fields,
+ primary_keys=["id"],
+ core_options=options,
+ )
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_field_aggregator_registry.py
b/paimon-python/pypaimon/tests/test_field_aggregator_registry.py
new file mode 100644
index 0000000000..2927f35b7b
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_field_aggregator_registry.py
@@ -0,0 +1,103 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Unit tests for the FieldAggregator registry contract.
+
+Drives :func:`register_aggregator` / :func:`create_field_aggregator`
+without touching the read pipeline so the wiring is pinned down before
+any concrete aggregators land in :mod:`aggregators`.
+"""
+
+import unittest
+
+from pypaimon.read.reader.aggregate import (
+ create_field_aggregator,
+ register_aggregator,
+)
+from pypaimon.read.reader.aggregate.field_aggregator import FieldAggregator
+from pypaimon.schema.data_types import AtomicType
+
+
+class _DummyAgg(FieldAggregator):
+ """Minimal concrete subclass used only by these tests."""
+
+ def agg(self, accumulator, input_field):
+ return input_field
+
+
+class FieldAggregatorRegistryTest(unittest.TestCase):
+
+ def test_register_and_create_returns_instance(self):
+ register_aggregator(
+ "_dummy_for_registry_test",
+ lambda field_type, field_name, options: _DummyAgg(
+ "_dummy_for_registry_test", field_type
+ ),
+ )
+ agg = create_field_aggregator(
+ AtomicType("INT"),
+ "field0",
+ "_dummy_for_registry_test",
+ options=None,
+ )
+ self.assertIsInstance(agg, _DummyAgg)
+ self.assertEqual(agg.name, "_dummy_for_registry_test")
+ self.assertEqual(agg.field_type, AtomicType("INT"))
+
+ def test_re_register_replaces_existing_factory(self):
+ register_aggregator(
+ "_dummy_replaceable",
+ lambda ft, fn, opts: _DummyAgg("first", ft),
+ )
+ register_aggregator(
+ "_dummy_replaceable",
+ lambda ft, fn, opts: _DummyAgg("second", ft),
+ )
+ agg = create_field_aggregator(
+ AtomicType("BIGINT"), "f", "_dummy_replaceable", options=None
+ )
+ self.assertEqual(agg.name, "second")
+
+ def test_unknown_identifier_raises_value_error(self):
+ with self.assertRaises(ValueError) as ctx:
+ create_field_aggregator(
+ AtomicType("INT"),
+ "field0",
+ "this_aggregator_does_not_exist",
+ options=None,
+ )
+ msg = str(ctx.exception)
+ self.assertIn("unsupported aggregation", msg)
+ self.assertIn("this_aggregator_does_not_exist", msg)
+
+ def test_default_retract_raises_not_implemented(self):
+ agg = _DummyAgg("dummy", AtomicType("INT"))
+ with self.assertRaises(NotImplementedError) as ctx:
+ agg.retract(1, 2)
+ self.assertIn("does not support retract", str(ctx.exception))
+ self.assertIn("dummy", str(ctx.exception))
+
+ def test_default_reset_is_noop(self):
+ # Base-class reset() must not raise so subclasses without
+ # per-group state can skip overriding it.
+ agg = _DummyAgg("dummy", AtomicType("INT"))
+ agg.reset() # no exception expected
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_field_aggregators.py
b/paimon-python/pypaimon/tests/test_field_aggregators.py
new file mode 100644
index 0000000000..f54a67cd95
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_field_aggregators.py
@@ -0,0 +1,274 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Unit tests for the built-in :class:`FieldAggregator` subclasses.
+
+Drives each aggregator directly to pin down the value semantics
+(reset behaviour, null handling, type validation) without going
+through the merge function or the read pipeline. End-to-end coverage
+on real PK tables lives in ``test_aggregation_e2e.py``.
+"""
+
+import datetime
+import unittest
+from decimal import Decimal
+
+from pypaimon.read.reader.aggregate import create_field_aggregator
+from pypaimon.read.reader.aggregate.aggregators import (
+ FieldBoolAndAgg,
+ FieldBoolOrAgg,
+ FieldFirstNonNullValueAgg,
+ FieldFirstValueAgg,
+ FieldLastNonNullValueAgg,
+ FieldLastValueAgg,
+ FieldMaxAgg,
+ FieldMinAgg,
+ FieldPrimaryKeyAgg,
+ FieldSumAgg,
+)
+from pypaimon.schema.data_types import AtomicType
+
+
+def _make(identifier, sql_type):
+ """Build an aggregator through the public registry path so we also
+ exercise the registered factory (including its type validation).
+ """
+ return create_field_aggregator(
+ AtomicType(sql_type), "field0", identifier, options=None
+ )
+
+
+class FieldPrimaryKeyAggTest(unittest.TestCase):
+
+ def test_returns_input_field(self):
+ agg = _make("primary_key", "BIGINT")
+ self.assertIsInstance(agg, FieldPrimaryKeyAgg)
+ self.assertEqual(agg.agg(None, 5), 5)
+ self.assertEqual(agg.agg(99, 5), 5)
+ self.assertIsNone(agg.agg(5, None))
+
+
+class FieldLastValueAggTest(unittest.TestCase):
+
+ def test_last_value_wins_including_null(self):
+ agg = _make("last_value", "VARCHAR")
+ self.assertIsInstance(agg, FieldLastValueAgg)
+ self.assertEqual(agg.agg(None, "a"), "a")
+ self.assertEqual(agg.agg("a", "b"), "b")
+ # Crucially: a later null replaces the accumulator (unlike
+ # last_non_null_value).
+ self.assertIsNone(agg.agg("a", None))
+
+
+class FieldLastNonNullValueAggTest(unittest.TestCase):
+
+ def test_null_inputs_are_absorbed(self):
+ agg = _make("last_non_null_value", "INT")
+ self.assertIsInstance(agg, FieldLastNonNullValueAgg)
+ self.assertEqual(agg.agg(None, 1), 1)
+ self.assertEqual(agg.agg(1, 2), 2)
+ self.assertEqual(agg.agg(2, None), 2)
+ self.assertIsNone(agg.agg(None, None))
+
+
+class FieldFirstValueAggTest(unittest.TestCase):
+
+ def test_first_value_locks_after_first_add(self):
+ agg = _make("first_value", "VARCHAR")
+ self.assertIsInstance(agg, FieldFirstValueAgg)
+ # First add returns input, even if input is None.
+ self.assertIsNone(agg.agg(None, None))
+ # Subsequent adds preserve the accumulator (None) regardless of input.
+ self.assertIsNone(agg.agg(None, "later"))
+
+ def test_reset_re_arms_first_value(self):
+ agg = _make("first_value", "INT")
+ self.assertEqual(agg.agg(None, 5), 5)
+ self.assertEqual(agg.agg(5, 9), 5) # locked
+ agg.reset()
+ # After reset the next add is treated as the first again.
+ self.assertEqual(agg.agg(None, 42), 42)
+
+
+class FieldFirstNonNullValueAggTest(unittest.TestCase):
+
+ def test_first_non_null_skips_nulls(self):
+ agg = _make("first_non_null_value", "INT")
+ self.assertIsInstance(agg, FieldFirstNonNullValueAgg)
+ # Initial null does not lock — accumulator stays None.
+ self.assertIsNone(agg.agg(None, None))
+ # First non-null locks.
+ self.assertEqual(agg.agg(None, 7), 7)
+ # Subsequent values do not replace the locked first.
+ self.assertEqual(agg.agg(7, 99), 7)
+ self.assertEqual(agg.agg(7, None), 7)
+
+ def test_reset_re_arms_first_non_null(self):
+ agg = _make("first_non_null_value", "INT")
+ self.assertEqual(agg.agg(None, 1), 1)
+ self.assertEqual(agg.agg(1, 2), 1)
+ agg.reset()
+ self.assertEqual(agg.agg(None, 9), 9)
+
+
+class FieldSumAggTest(unittest.TestCase):
+
+ def test_int_sum(self):
+ agg = _make("sum", "BIGINT")
+ self.assertIsInstance(agg, FieldSumAgg)
+ self.assertEqual(agg.agg(None, 5), 5)
+ self.assertEqual(agg.agg(5, 7), 12)
+
+ def test_float_sum(self):
+ agg = _make("sum", "DOUBLE")
+ self.assertAlmostEqual(agg.agg(1.5, 2.25), 3.75)
+
+ def test_decimal_sum(self):
+ agg = _make("sum", "DECIMAL(10,2)")
+ result = agg.agg(Decimal("1.23"), Decimal("4.56"))
+ self.assertEqual(result, Decimal("5.79"))
+
+ def test_null_inputs_return_non_null_operand(self):
+ agg = _make("sum", "INT")
+ self.assertEqual(agg.agg(None, 5), 5)
+ self.assertEqual(agg.agg(5, None), 5)
+ self.assertIsNone(agg.agg(None, None))
+
+ def test_non_numeric_type_rejected_at_construction(self):
+ with self.assertRaises(ValueError) as ctx:
+ _make("sum", "VARCHAR")
+ self.assertIn("numeric", str(ctx.exception))
+
+
+class FieldMaxAggTest(unittest.TestCase):
+
+ def test_numeric_max(self):
+ agg = _make("max", "INT")
+ self.assertIsInstance(agg, FieldMaxAgg)
+ self.assertEqual(agg.agg(3, 7), 7)
+ self.assertEqual(agg.agg(7, 3), 7)
+ self.assertEqual(agg.agg(5, 5), 5)
+
+ def test_string_max(self):
+ agg = _make("max", "VARCHAR")
+ self.assertEqual(agg.agg("apple", "banana"), "banana")
+ self.assertEqual(agg.agg("banana", "apple"), "banana")
+
+ def test_date_max(self):
+ agg = _make("max", "DATE")
+ d1 = datetime.date(2020, 1, 1)
+ d2 = datetime.date(2025, 6, 15)
+ self.assertEqual(agg.agg(d1, d2), d2)
+ self.assertEqual(agg.agg(d2, d1), d2)
+
+ def test_null_inputs_return_non_null_operand(self):
+ agg = _make("max", "INT")
+ self.assertEqual(agg.agg(None, 5), 5)
+ self.assertEqual(agg.agg(5, None), 5)
+ self.assertIsNone(agg.agg(None, None))
+
+
+class FieldMinAggTest(unittest.TestCase):
+
+ def test_numeric_min(self):
+ agg = _make("min", "INT")
+ self.assertIsInstance(agg, FieldMinAgg)
+ self.assertEqual(agg.agg(3, 7), 3)
+ self.assertEqual(agg.agg(7, 3), 3)
+ self.assertEqual(agg.agg(5, 5), 5)
+
+ def test_string_min(self):
+ agg = _make("min", "VARCHAR")
+ self.assertEqual(agg.agg("apple", "banana"), "apple")
+
+ def test_null_inputs_return_non_null_operand(self):
+ agg = _make("min", "INT")
+ self.assertEqual(agg.agg(None, 5), 5)
+ self.assertEqual(agg.agg(5, None), 5)
+ self.assertIsNone(agg.agg(None, None))
+
+
+class FieldBoolOrAggTest(unittest.TestCase):
+
+ def test_truth_table(self):
+ agg = _make("bool_or", "BOOLEAN")
+ self.assertIsInstance(agg, FieldBoolOrAgg)
+ self.assertTrue(agg.agg(True, True))
+ self.assertTrue(agg.agg(True, False))
+ self.assertTrue(agg.agg(False, True))
+ self.assertFalse(agg.agg(False, False))
+
+ def test_null_inputs_return_non_null_operand(self):
+ agg = _make("bool_or", "BOOLEAN")
+ self.assertTrue(agg.agg(None, True))
+ self.assertFalse(agg.agg(False, None))
+ self.assertIsNone(agg.agg(None, None))
+
+ def test_non_boolean_type_rejected_at_construction(self):
+ with self.assertRaises(ValueError) as ctx:
+ _make("bool_or", "INT")
+ self.assertIn("BOOLEAN", str(ctx.exception))
+
+
+class FieldBoolAndAggTest(unittest.TestCase):
+
+ def test_truth_table(self):
+ agg = _make("bool_and", "BOOLEAN")
+ self.assertIsInstance(agg, FieldBoolAndAgg)
+ self.assertTrue(agg.agg(True, True))
+ self.assertFalse(agg.agg(True, False))
+ self.assertFalse(agg.agg(False, True))
+ self.assertFalse(agg.agg(False, False))
+
+ def test_null_inputs_return_non_null_operand(self):
+ agg = _make("bool_and", "BOOLEAN")
+ self.assertTrue(agg.agg(None, True))
+ self.assertFalse(agg.agg(False, None))
+ self.assertIsNone(agg.agg(None, None))
+
+ def test_non_boolean_type_rejected_at_construction(self):
+ with self.assertRaises(ValueError) as ctx:
+ _make("bool_and", "VARCHAR")
+ self.assertIn("BOOLEAN", str(ctx.exception))
+
+
+class RegistrationTest(unittest.TestCase):
+ """Sanity check that all 10 expected aggregators (the primary-key
+ placeholder plus 9 value aggregators) are registered when the
+ package is imported. Guards against future refactors silently
+ dropping a registration.
+ """
+
+ EXPECTED = frozenset([
+ "primary_key",
+ "last_value", "last_non_null_value",
+ "first_value", "first_non_null_value",
+ "sum", "max", "min",
+ "bool_or", "bool_and",
+ ])
+
+ def test_all_expected_aggregators_registered(self):
+ from pypaimon.read.reader.aggregate import _FACTORIES
+ registered = set(_FACTORIES.keys())
+ missing = self.EXPECTED - registered
+ self.assertEqual(missing, set(),
+ "Missing built-in aggregators: {}".format(missing))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py
b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
index 9d2b81530b..8606d6c239 100644
--- a/paimon-python/pypaimon/tests/test_partial_update_e2e.py
+++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
@@ -21,10 +21,9 @@
Each test creates a PK table with ``merge-engine`` set to a particular
value, writes one or more batches, and reads back. Partial-update reads
must merge non-null fields across batches; ``deduplicate`` must keep
-the latest row only; ``first-row`` must keep the earliest row;
-``aggregation`` must raise ``NotImplementedError`` (until it is
-ported), since silently treating it as deduplicate would corrupt the
-user's data.
+the latest row only; ``first-row`` must keep the earliest row.
+``aggregation`` has its own engine-specific e2e coverage in
+:mod:`test_aggregation_e2e`.
"""
import os
@@ -189,21 +188,7 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
[{'id': 1, 'a': 'new', 'b': None, 'c': None}],
)
- # -- engines we don't support yet ------------------------------------
-
- def test_aggregation_engine_raises_not_implemented(self):
- """Until ``aggregation`` is ported, reading an aggregation table
- must raise rather than silently produce dedupe results."""
- table = self._create_pk_table('agg_unsupported',
- merge_engine='aggregation')
- self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}])
- self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}])
-
- rb = table.new_read_builder()
- splits = rb.new_scan().plan().splits()
- with self.assertRaises(NotImplementedError) as cm:
- rb.new_read().to_arrow(splits)
- self.assertIn('aggregation', str(cm.exception))
+ # -- other supported engines (smoke) ---------------------------------
def test_first_row_engine_keeps_first(self):
"""The ``first-row`` engine must keep the earliest row per PK."""