This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c3504e26d4 [python] Implement first-row merge engine for read path
(#7968)
c3504e26d4 is described below
commit c3504e26d4b43afcfe8bbe2557661db276c850c8
Author: Junrui Lee <[email protected]>
AuthorDate: Sat May 30 09:38:32 2026 +0800
[python] Implement first-row merge engine for read path (#7968)
Add read-path support for the `first-row` merge engine in pypaimon. The
first-row engine keeps only the earliest row per primary key, which is
the opposite of the default `deduplicate` engine that keeps the latest.
Previously, reading a table configured with `merge-engine: first-row`
raised `NotImplementedError`. This PR implements the merge function and
wires it into the read pipeline.
---
.../pypaimon/common/options/core_options.py | 22 +++
.../pypaimon/read/merge_engine_support.py | 4 +-
.../read/reader/first_row_merge_function.py | 50 +++++++
paimon-python/pypaimon/read/split_read.py | 6 +
paimon-python/pypaimon/tests/test_first_row_e2e.py | 154 +++++++++++++++++++++
.../tests/test_first_row_merge_function.py | 130 +++++++++++++++++
.../pypaimon/tests/test_partial_update_e2e.py | 27 ++--
7 files changed, 378 insertions(+), 15 deletions(-)
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index dededad00c..1cb9e831d3 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -23,6 +23,7 @@ from typing import Dict, Optional
from pypaimon.common.memory_size import MemorySize
from pypaimon.common.options import Options
from pypaimon.common.options.config_option import ConfigOption
+from pypaimon.common.options.options_utils import OptionsUtils
from pypaimon.common.options.config_options import ConfigOptions
@@ -397,6 +398,14 @@ class CoreOptions:
.with_description("Specify the merge engine for table with primary
key. "
"Options: deduplicate, partial-update, aggregation,
first-row.")
)
+
+ IGNORE_DELETE: ConfigOption[bool] = (
+ ConfigOptions.key("ignore-delete")
+ .boolean_type()
+ .default_value(False)
+ .with_description("Whether to ignore delete records.")
+ )
+
# Commit options
COMMIT_USER_PREFIX: ConfigOption[str] = (
ConfigOptions.key("commit.user-prefix")
@@ -785,6 +794,19 @@ class CoreOptions:
def merge_engine(self, default=None):
return self.options.get(CoreOptions.MERGE_ENGINE, default)
+ def ignore_delete(self) -> bool:
+ raw = self.options.to_map()
+ fallback_keys = (
+ "ignore-delete", "first-row.ignore-delete",
+ "deduplicate.ignore-delete",
+ "partial-update.ignore-delete",
+ )
+ for key in fallback_keys:
+ val = raw.get(key)
+ if val is not None:
+ return OptionsUtils.convert_to_boolean(val)
+ return False
+
def data_file_external_paths(self, default=None):
external_paths_str =
self.options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS, default)
if not external_paths_str:
diff --git a/paimon-python/pypaimon/read/merge_engine_support.py
b/paimon-python/pypaimon/read/merge_engine_support.py
index d54cd8b0e4..2dde400465 100644
--- a/paimon-python/pypaimon/read/merge_engine_support.py
+++ b/paimon-python/pypaimon/read/merge_engine_support.py
@@ -62,6 +62,8 @@ def check_supported(table) -> None:
engine = table.options.merge_engine()
if engine == MergeEngine.DEDUPLICATE:
return
+ if engine == MergeEngine.FIRST_ROW:
+ return
if engine == MergeEngine.PARTIAL_UPDATE:
unsupported = partial_update_unsupported_options(table)
if unsupported:
@@ -79,7 +81,7 @@ def check_supported(table) -> None:
return
raise NotImplementedError(
"merge-engine '{}' is not implemented in pypaimon yet "
- "(supported: deduplicate, partial-update). Use the Java "
+ "(supported: deduplicate, first-row, partial-update). Use the Java "
"client or open an issue to track support.".format(engine.value)
)
diff --git a/paimon-python/pypaimon/read/reader/first_row_merge_function.py
b/paimon-python/pypaimon/read/reader/first_row_merge_function.py
new file mode 100644
index 0000000000..162d608a64
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/first_row_merge_function.py
@@ -0,0 +1,50 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Optional
+
+from pypaimon.table.row.key_value import KeyValue
+
+
+class FirstRowMergeFunction:
+ """A MergeFunction where key is primary key (unique) and value is the
+ full record, only keep the first one."""
+
+ def __init__(self, ignore_delete: bool = False):
+ self.ignore_delete = ignore_delete
+ self.first: Optional[KeyValue] = None
+
+ def reset(self) -> None:
+ self.first = None
+
+ def add(self, kv: KeyValue) -> None:
+ if not kv.is_add():
+ if self.ignore_delete:
+ return
+ raise ValueError(
+ "By default, First row merge engine can not accept "
+ "DELETE/UPDATE_BEFORE records.\n"
+ "You can config 'ignore-delete' to ignore the "
+ "DELETE/UPDATE_BEFORE records."
+ )
+
+ if self.first is None:
+ self.first = kv
+
+ def get_result(self) -> Optional[KeyValue]:
+ return self.first
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 2a9a8f3db7..6256bdd859 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -56,6 +56,8 @@ from pypaimon.read.reader.key_value_wrap_reader import
KeyValueWrapReader
from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
from pypaimon.read.reader.partial_update_merge_function import \
PartialUpdateMergeFunction
+from pypaimon.read.reader.first_row_merge_function import \
+ FirstRowMergeFunction
from pypaimon.read.reader.sort_merge_reader import (DeduplicateMergeFunction,
SortMergeReaderWithMinHeap)
from pypaimon.read.push_down_utils import _get_all_fields
@@ -689,6 +691,10 @@ class MergeFileSplitRead(SplitRead):
value_arity=self.value_arity,
nullables=[f.type.nullable for f in self.value_fields],
)
+ if engine == MergeEngine.FIRST_ROW:
+ return FirstRowMergeFunction(
+ ignore_delete=self.table.options.ignore_delete(),
+ )
# check_supported() rejects everything else at TableRead.__init__.
raise AssertionError(
"unreachable: merge-engine '{}' should have been rejected by "
diff --git a/paimon-python/pypaimon/tests/test_first_row_e2e.py
b/paimon-python/pypaimon/tests/test_first_row_e2e.py
new file mode 100644
index 0000000000..17a13c9aae
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_first_row_e2e.py
@@ -0,0 +1,154 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""End-to-end tests for the ``first-row`` merge engine.
+
+Each test creates a PK table with ``merge-engine`` set to ``first-row``,
+writes one or more batches, and reads back. The first-row engine keeps
+only the earliest row per primary key.
+"""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class FirstRowMergeEngineE2ETest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', True)
+
+ cls.pa_schema = pa.schema([
+ pa.field('id', pa.int64(), nullable=False),
+ ('a', pa.string()),
+ ('b', pa.string()),
+ ])
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def _create_pk_table(self, table_name, extra_options=None):
+ options = {
+ 'bucket': '1',
+ 'merge-engine': 'first-row',
+ }
+ if extra_options:
+ options.update(extra_options)
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['id'],
+ options=options,
+ )
+ full = 'default.{}'.format(table_name)
+ self.catalog.create_table(full, schema, False)
+ return self.catalog.get_table(full)
+
+ def _write(self, table, rows):
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ c = wb.new_commit()
+ try:
+ w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema))
+ c.commit(w.prepare_commit())
+ finally:
+ w.close()
+ c.close()
+
+ def _read(self, table):
+ rb = table.new_read_builder()
+ splits = rb.new_scan().plan().splits()
+ if not splits:
+ return []
+ return sorted(
+ rb.new_read().to_arrow(splits).to_pylist(),
+ key=lambda r: r['id'],
+ )
+
+ def test_first_row_keeps_earliest(self):
+ """Two writes with the same PK — first-row keeps the first one."""
+ table = self._create_pk_table('first_row_basic')
+ self._write(table, [{'id': 1, 'a': 'first', 'b': 'B1'}])
+ self._write(table, [{'id': 1, 'a': 'second', 'b': 'B2'}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'a': 'first', 'b': 'B1'}],
+ )
+
+ def test_first_row_multiple_keys(self):
+ """Multiple PKs across two writes — each key keeps its first row."""
+ table = self._create_pk_table('first_row_multi_key')
+ self._write(table, [
+ {'id': 1, 'a': 'A1', 'b': 'B1'},
+ {'id': 2, 'a': 'A2', 'b': 'B2'},
+ ])
+ self._write(table, [
+ {'id': 1, 'a': 'A1-new', 'b': 'B1-new'},
+ {'id': 3, 'a': 'A3', 'b': 'B3'},
+ ])
+
+ self.assertEqual(
+ self._read(table),
+ [
+ {'id': 1, 'a': 'A1', 'b': 'B1'},
+ {'id': 2, 'a': 'A2', 'b': 'B2'},
+ {'id': 3, 'a': 'A3', 'b': 'B3'},
+ ],
+ )
+
+ def test_first_row_three_writes(self):
+ """Three writes for the same PK — always the first one wins."""
+ table = self._create_pk_table('first_row_three')
+ self._write(table, [{'id': 1, 'a': 'first', 'b': None}])
+ self._write(table, [{'id': 1, 'a': 'second', 'b': 'B'}])
+ self._write(table, [{'id': 1, 'a': 'third', 'b': 'C'}])
+
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'a': 'first', 'b': None}],
+ )
+
+ def test_first_row_single_write(self):
+ """A single write should read back unchanged."""
+ table = self._create_pk_table('first_row_single')
+ self._write(table, [
+ {'id': 1, 'a': 'A', 'b': 'B'},
+ {'id': 2, 'a': 'C', 'b': 'D'},
+ ])
+
+ self.assertEqual(
+ self._read(table),
+ [
+ {'id': 1, 'a': 'A', 'b': 'B'},
+ {'id': 2, 'a': 'C', 'b': 'D'},
+ ],
+ )
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_first_row_merge_function.py
b/paimon-python/pypaimon/tests/test_first_row_merge_function.py
new file mode 100644
index 0000000000..7f411f85ff
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_first_row_merge_function.py
@@ -0,0 +1,130 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Direct unit tests for ``FirstRowMergeFunction``.
+
+Drives the merge function with synthetic ``KeyValue`` instances so the
+contract is pinned down without going through the full read pipeline.
+"""
+
+import unittest
+
+from pypaimon.read.reader.first_row_merge_function import \
+ FirstRowMergeFunction
+from pypaimon.table.row.key_value import KeyValue
+from pypaimon.table.row.row_kind import RowKind
+
+
+def _kv(key, seq, row_kind, value):
+ kv = KeyValue(key_arity=len(key), value_arity=len(value))
+ kv.replace(tuple(key) + (seq, row_kind.value) + tuple(value))
+ return kv
+
+
+def _result_value(kv):
+ return tuple(kv.value.get_field(i) for i in range(kv.value_arity))
+
+
+def _result_key(kv):
+ return tuple(kv.key.get_field(i) for i in range(kv.key_arity))
+
+
+class FirstRowMergeFunctionTest(unittest.TestCase):
+
+ def test_single_insert_returns_value(self):
+ mf = FirstRowMergeFunction()
+ mf.reset()
+ mf.add(_kv((1,), 1, RowKind.INSERT, (10, "a")))
+ result = mf.get_result()
+ self.assertIsNotNone(result)
+ self.assertEqual(_result_key(result), (1,))
+ self.assertEqual(_result_value(result), (10, "a"))
+
+ def test_keeps_first_row_not_latest(self):
+ mf = FirstRowMergeFunction()
+ mf.reset()
+ mf.add(_kv((1,), 1, RowKind.INSERT, (10, "first")))
+ mf.add(_kv((1,), 2, RowKind.INSERT, (20, "second")))
+ mf.add(_kv((1,), 3, RowKind.UPDATE_AFTER, (30, "third")))
+ result = mf.get_result()
+ self.assertEqual(_result_value(result), (10, "first"))
+
+ def test_reset_clears_state(self):
+ mf = FirstRowMergeFunction()
+ mf.reset()
+ mf.add(_kv((1,), 1, RowKind.INSERT, (10,)))
+ self.assertIsNotNone(mf.get_result())
+
+ mf.reset()
+ self.assertIsNone(mf.get_result())
+
+ mf.add(_kv((2,), 2, RowKind.INSERT, (20,)))
+ result = mf.get_result()
+ self.assertEqual(_result_key(result), (2,))
+ self.assertEqual(_result_value(result), (20,))
+
+ def test_empty_returns_none(self):
+ mf = FirstRowMergeFunction()
+ mf.reset()
+ self.assertIsNone(mf.get_result())
+
+ def test_delete_raises_by_default(self):
+ mf = FirstRowMergeFunction(ignore_delete=False)
+ mf.reset()
+ with self.assertRaises(ValueError):
+ mf.add(_kv((1,), 1, RowKind.DELETE, (10,)))
+
+ def test_update_before_raises_by_default(self):
+ mf = FirstRowMergeFunction(ignore_delete=False)
+ mf.reset()
+ with self.assertRaises(ValueError):
+ mf.add(_kv((1,), 1, RowKind.UPDATE_BEFORE, (10,)))
+
+ def test_ignore_delete_skips_retract(self):
+ mf = FirstRowMergeFunction(ignore_delete=True)
+ mf.reset()
+ mf.add(_kv((1,), 1, RowKind.DELETE, (10,)))
+ mf.add(_kv((1,), 2, RowKind.INSERT, (20,)))
+ result = mf.get_result()
+ self.assertIsNotNone(result)
+ self.assertEqual(_result_value(result), (20,))
+
+ def test_ignore_delete_skips_update_before(self):
+ mf = FirstRowMergeFunction(ignore_delete=True)
+ mf.reset()
+ mf.add(_kv((1,), 1, RowKind.UPDATE_BEFORE, (10,)))
+ self.assertIsNone(mf.get_result())
+
+ def test_ignore_delete_only_retract_returns_none(self):
+ mf = FirstRowMergeFunction(ignore_delete=True)
+ mf.reset()
+ mf.add(_kv((1,), 1, RowKind.DELETE, (10,)))
+ mf.add(_kv((1,), 2, RowKind.UPDATE_BEFORE, (20,)))
+ self.assertIsNone(mf.get_result())
+
+ def test_update_after_accepted_as_first(self):
+ mf = FirstRowMergeFunction()
+ mf.reset()
+ mf.add(_kv((1,), 1, RowKind.UPDATE_AFTER, (10,)))
+ result = mf.get_result()
+ self.assertIsNotNone(result)
+ self.assertEqual(_result_value(result), (10,))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py
b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
index af5f61e98f..9d2b81530b 100644
--- a/paimon-python/pypaimon/tests/test_partial_update_e2e.py
+++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
@@ -21,9 +21,10 @@
Each test creates a PK table with ``merge-engine`` set to a particular
value, writes one or more batches, and reads back. Partial-update reads
must merge non-null fields across batches; ``deduplicate`` must keep
-the latest row only; ``aggregation`` and ``first-row`` must raise
-``NotImplementedError`` (until they are ported), since silently
-treating them as deduplicate would corrupt the user's data.
+the latest row only; ``first-row`` must keep the earliest row;
+``aggregation`` must raise ``NotImplementedError`` (until it is
+ported), since silently treating it as deduplicate would corrupt the
+user's data.
"""
import os
@@ -204,19 +205,17 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
rb.new_read().to_arrow(splits)
self.assertIn('aggregation', str(cm.exception))
- def test_first_row_engine_raises_not_implemented(self):
- """Until ``first-row`` is ported, reading a first-row table must
- raise rather than silently produce dedupe results."""
- table = self._create_pk_table('first_row_unsupported',
+ def test_first_row_engine_keeps_first(self):
+ """The ``first-row`` engine must keep the earliest row per PK."""
+ table = self._create_pk_table('first_row_supported',
merge_engine='first-row')
- self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}])
- self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}])
+ self._write(table, [{'id': 1, 'a': 'first', 'b': None, 'c': None}])
+ self._write(table, [{'id': 1, 'a': 'second', 'b': 'B', 'c': 'C'}])
- rb = table.new_read_builder()
- splits = rb.new_scan().plan().splits()
- with self.assertRaises(NotImplementedError) as cm:
- rb.new_read().to_arrow(splits)
- self.assertIn('first-row', str(cm.exception))
+ self.assertEqual(
+ self._read(table),
+ [{'id': 1, 'a': 'first', 'b': None, 'c': None}],
+ )
# -- partial-update + out-of-scope option combinations ---------------
#