This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c3504e26d4 [python] Implement first-row merge engine for read path 
(#7968)
c3504e26d4 is described below

commit c3504e26d4b43afcfe8bbe2557661db276c850c8
Author: Junrui Lee <[email protected]>
AuthorDate: Sat May 30 09:38:32 2026 +0800

    [python] Implement first-row merge engine for read path (#7968)
    
    Add read-path support for the `first-row` merge engine in pypaimon. The
    first-row engine keeps only the earliest row per primary key, which is
    the opposite of the default `deduplicate` engine that keeps the latest.
    
    Previously, reading a table configured with `merge-engine: first-row`
    raised `NotImplementedError`. This PR implements the merge function and
    wires it into the read pipeline.
---
 .../pypaimon/common/options/core_options.py        |  22 +++
 .../pypaimon/read/merge_engine_support.py          |   4 +-
 .../read/reader/first_row_merge_function.py        |  50 +++++++
 paimon-python/pypaimon/read/split_read.py          |   6 +
 paimon-python/pypaimon/tests/test_first_row_e2e.py | 154 +++++++++++++++++++++
 .../tests/test_first_row_merge_function.py         | 130 +++++++++++++++++
 .../pypaimon/tests/test_partial_update_e2e.py      |  27 ++--
 7 files changed, 378 insertions(+), 15 deletions(-)

diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index dededad00c..1cb9e831d3 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -23,6 +23,7 @@ from typing import Dict, Optional
 from pypaimon.common.memory_size import MemorySize
 from pypaimon.common.options import Options
 from pypaimon.common.options.config_option import ConfigOption
+from pypaimon.common.options.options_utils import OptionsUtils
 from pypaimon.common.options.config_options import ConfigOptions
 
 
@@ -397,6 +398,14 @@ class CoreOptions:
         .with_description("Specify the merge engine for table with primary 
key. "
                           "Options: deduplicate, partial-update, aggregation, 
first-row.")
     )
+
+    IGNORE_DELETE: ConfigOption[bool] = (
+        ConfigOptions.key("ignore-delete")
+        .boolean_type()
+        .default_value(False)
+        .with_description("Whether to ignore delete records.")
+    )
+
     # Commit options
     COMMIT_USER_PREFIX: ConfigOption[str] = (
         ConfigOptions.key("commit.user-prefix")
@@ -785,6 +794,19 @@ class CoreOptions:
     def merge_engine(self, default=None):
         return self.options.get(CoreOptions.MERGE_ENGINE, default)
 
+    def ignore_delete(self) -> bool:
+        raw = self.options.to_map()
+        fallback_keys = (
+            "ignore-delete", "first-row.ignore-delete",
+            "deduplicate.ignore-delete",
+            "partial-update.ignore-delete",
+        )
+        for key in fallback_keys:
+            val = raw.get(key)
+            if val is not None:
+                return OptionsUtils.convert_to_boolean(val)
+        return False
+
     def data_file_external_paths(self, default=None):
         external_paths_str = 
self.options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS, default)
         if not external_paths_str:
diff --git a/paimon-python/pypaimon/read/merge_engine_support.py 
b/paimon-python/pypaimon/read/merge_engine_support.py
index d54cd8b0e4..2dde400465 100644
--- a/paimon-python/pypaimon/read/merge_engine_support.py
+++ b/paimon-python/pypaimon/read/merge_engine_support.py
@@ -62,6 +62,8 @@ def check_supported(table) -> None:
     engine = table.options.merge_engine()
     if engine == MergeEngine.DEDUPLICATE:
         return
+    if engine == MergeEngine.FIRST_ROW:
+        return
     if engine == MergeEngine.PARTIAL_UPDATE:
         unsupported = partial_update_unsupported_options(table)
         if unsupported:
@@ -79,7 +81,7 @@ def check_supported(table) -> None:
         return
     raise NotImplementedError(
         "merge-engine '{}' is not implemented in pypaimon yet "
-        "(supported: deduplicate, partial-update). Use the Java "
+        "(supported: deduplicate, first-row, partial-update). Use the Java "
         "client or open an issue to track support.".format(engine.value)
     )
 
diff --git a/paimon-python/pypaimon/read/reader/first_row_merge_function.py 
b/paimon-python/pypaimon/read/reader/first_row_merge_function.py
new file mode 100644
index 0000000000..162d608a64
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/first_row_merge_function.py
@@ -0,0 +1,50 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Optional
+
+from pypaimon.table.row.key_value import KeyValue
+
+
+class FirstRowMergeFunction:
+    """A MergeFunction where key is primary key (unique) and value is the
+    full record, only keep the first one."""
+
+    def __init__(self, ignore_delete: bool = False):
+        self.ignore_delete = ignore_delete
+        self.first: Optional[KeyValue] = None
+
+    def reset(self) -> None:
+        self.first = None
+
+    def add(self, kv: KeyValue) -> None:
+        if not kv.is_add():
+            if self.ignore_delete:
+                return
+            raise ValueError(
+                "By default, First row merge engine can not accept "
+                "DELETE/UPDATE_BEFORE records.\n"
+                "You can config 'ignore-delete' to ignore the "
+                "DELETE/UPDATE_BEFORE records."
+            )
+
+        if self.first is None:
+            self.first = kv
+
+    def get_result(self) -> Optional[KeyValue]:
+        return self.first
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 2a9a8f3db7..6256bdd859 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -56,6 +56,8 @@ from pypaimon.read.reader.key_value_wrap_reader import 
KeyValueWrapReader
 from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
 from pypaimon.read.reader.partial_update_merge_function import \
     PartialUpdateMergeFunction
+from pypaimon.read.reader.first_row_merge_function import \
+    FirstRowMergeFunction
 from pypaimon.read.reader.sort_merge_reader import (DeduplicateMergeFunction,
                                                     SortMergeReaderWithMinHeap)
 from pypaimon.read.push_down_utils import _get_all_fields
@@ -689,6 +691,10 @@ class MergeFileSplitRead(SplitRead):
                 value_arity=self.value_arity,
                 nullables=[f.type.nullable for f in self.value_fields],
             )
+        if engine == MergeEngine.FIRST_ROW:
+            return FirstRowMergeFunction(
+                ignore_delete=self.table.options.ignore_delete(),
+            )
         # check_supported() rejects everything else at TableRead.__init__.
         raise AssertionError(
             "unreachable: merge-engine '{}' should have been rejected by "
diff --git a/paimon-python/pypaimon/tests/test_first_row_e2e.py 
b/paimon-python/pypaimon/tests/test_first_row_e2e.py
new file mode 100644
index 0000000000..17a13c9aae
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_first_row_e2e.py
@@ -0,0 +1,154 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""End-to-end tests for the ``first-row`` merge engine.
+
+Each test creates a PK table with ``merge-engine`` set to ``first-row``,
+writes one or more batches, and reads back. The first-row engine keeps
+only the earliest row per primary key.
+"""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class FirstRowMergeEngineE2ETest(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+
+        cls.pa_schema = pa.schema([
+            pa.field('id', pa.int64(), nullable=False),
+            ('a', pa.string()),
+            ('b', pa.string()),
+        ])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def _create_pk_table(self, table_name, extra_options=None):
+        options = {
+            'bucket': '1',
+            'merge-engine': 'first-row',
+        }
+        if extra_options:
+            options.update(extra_options)
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['id'],
+            options=options,
+        )
+        full = 'default.{}'.format(table_name)
+        self.catalog.create_table(full, schema, False)
+        return self.catalog.get_table(full)
+
+    def _write(self, table, rows):
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        c = wb.new_commit()
+        try:
+            w.write_arrow(pa.Table.from_pylist(rows, schema=self.pa_schema))
+            c.commit(w.prepare_commit())
+        finally:
+            w.close()
+            c.close()
+
+    def _read(self, table):
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+        if not splits:
+            return []
+        return sorted(
+            rb.new_read().to_arrow(splits).to_pylist(),
+            key=lambda r: r['id'],
+        )
+
+    def test_first_row_keeps_earliest(self):
+        """Two writes with the same PK — first-row keeps the first one."""
+        table = self._create_pk_table('first_row_basic')
+        self._write(table, [{'id': 1, 'a': 'first', 'b': 'B1'}])
+        self._write(table, [{'id': 1, 'a': 'second', 'b': 'B2'}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'a': 'first', 'b': 'B1'}],
+        )
+
+    def test_first_row_multiple_keys(self):
+        """Multiple PKs across two writes — each key keeps its first row."""
+        table = self._create_pk_table('first_row_multi_key')
+        self._write(table, [
+            {'id': 1, 'a': 'A1', 'b': 'B1'},
+            {'id': 2, 'a': 'A2', 'b': 'B2'},
+        ])
+        self._write(table, [
+            {'id': 1, 'a': 'A1-new', 'b': 'B1-new'},
+            {'id': 3, 'a': 'A3', 'b': 'B3'},
+        ])
+
+        self.assertEqual(
+            self._read(table),
+            [
+                {'id': 1, 'a': 'A1', 'b': 'B1'},
+                {'id': 2, 'a': 'A2', 'b': 'B2'},
+                {'id': 3, 'a': 'A3', 'b': 'B3'},
+            ],
+        )
+
+    def test_first_row_three_writes(self):
+        """Three writes for the same PK — always the first one wins."""
+        table = self._create_pk_table('first_row_three')
+        self._write(table, [{'id': 1, 'a': 'first', 'b': None}])
+        self._write(table, [{'id': 1, 'a': 'second', 'b': 'B'}])
+        self._write(table, [{'id': 1, 'a': 'third', 'b': 'C'}])
+
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'a': 'first', 'b': None}],
+        )
+
+    def test_first_row_single_write(self):
+        """A single write should read back unchanged."""
+        table = self._create_pk_table('first_row_single')
+        self._write(table, [
+            {'id': 1, 'a': 'A', 'b': 'B'},
+            {'id': 2, 'a': 'C', 'b': 'D'},
+        ])
+
+        self.assertEqual(
+            self._read(table),
+            [
+                {'id': 1, 'a': 'A', 'b': 'B'},
+                {'id': 2, 'a': 'C', 'b': 'D'},
+            ],
+        )
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_first_row_merge_function.py 
b/paimon-python/pypaimon/tests/test_first_row_merge_function.py
new file mode 100644
index 0000000000..7f411f85ff
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_first_row_merge_function.py
@@ -0,0 +1,130 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Direct unit tests for ``FirstRowMergeFunction``.
+
+Drives the merge function with synthetic ``KeyValue`` instances so the
+contract is pinned down without going through the full read pipeline.
+"""
+
+import unittest
+
+from pypaimon.read.reader.first_row_merge_function import \
+    FirstRowMergeFunction
+from pypaimon.table.row.key_value import KeyValue
+from pypaimon.table.row.row_kind import RowKind
+
+
+def _kv(key, seq, row_kind, value):
+    kv = KeyValue(key_arity=len(key), value_arity=len(value))
+    kv.replace(tuple(key) + (seq, row_kind.value) + tuple(value))
+    return kv
+
+
+def _result_value(kv):
+    return tuple(kv.value.get_field(i) for i in range(kv.value_arity))
+
+
+def _result_key(kv):
+    return tuple(kv.key.get_field(i) for i in range(kv.key_arity))
+
+
+class FirstRowMergeFunctionTest(unittest.TestCase):
+
+    def test_single_insert_returns_value(self):
+        mf = FirstRowMergeFunction()
+        mf.reset()
+        mf.add(_kv((1,), 1, RowKind.INSERT, (10, "a")))
+        result = mf.get_result()
+        self.assertIsNotNone(result)
+        self.assertEqual(_result_key(result), (1,))
+        self.assertEqual(_result_value(result), (10, "a"))
+
+    def test_keeps_first_row_not_latest(self):
+        mf = FirstRowMergeFunction()
+        mf.reset()
+        mf.add(_kv((1,), 1, RowKind.INSERT, (10, "first")))
+        mf.add(_kv((1,), 2, RowKind.INSERT, (20, "second")))
+        mf.add(_kv((1,), 3, RowKind.UPDATE_AFTER, (30, "third")))
+        result = mf.get_result()
+        self.assertEqual(_result_value(result), (10, "first"))
+
+    def test_reset_clears_state(self):
+        mf = FirstRowMergeFunction()
+        mf.reset()
+        mf.add(_kv((1,), 1, RowKind.INSERT, (10,)))
+        self.assertIsNotNone(mf.get_result())
+
+        mf.reset()
+        self.assertIsNone(mf.get_result())
+
+        mf.add(_kv((2,), 2, RowKind.INSERT, (20,)))
+        result = mf.get_result()
+        self.assertEqual(_result_key(result), (2,))
+        self.assertEqual(_result_value(result), (20,))
+
+    def test_empty_returns_none(self):
+        mf = FirstRowMergeFunction()
+        mf.reset()
+        self.assertIsNone(mf.get_result())
+
+    def test_delete_raises_by_default(self):
+        mf = FirstRowMergeFunction(ignore_delete=False)
+        mf.reset()
+        with self.assertRaises(ValueError):
+            mf.add(_kv((1,), 1, RowKind.DELETE, (10,)))
+
+    def test_update_before_raises_by_default(self):
+        mf = FirstRowMergeFunction(ignore_delete=False)
+        mf.reset()
+        with self.assertRaises(ValueError):
+            mf.add(_kv((1,), 1, RowKind.UPDATE_BEFORE, (10,)))
+
+    def test_ignore_delete_skips_retract(self):
+        mf = FirstRowMergeFunction(ignore_delete=True)
+        mf.reset()
+        mf.add(_kv((1,), 1, RowKind.DELETE, (10,)))
+        mf.add(_kv((1,), 2, RowKind.INSERT, (20,)))
+        result = mf.get_result()
+        self.assertIsNotNone(result)
+        self.assertEqual(_result_value(result), (20,))
+
+    def test_ignore_delete_skips_update_before(self):
+        mf = FirstRowMergeFunction(ignore_delete=True)
+        mf.reset()
+        mf.add(_kv((1,), 1, RowKind.UPDATE_BEFORE, (10,)))
+        self.assertIsNone(mf.get_result())
+
+    def test_ignore_delete_only_retract_returns_none(self):
+        mf = FirstRowMergeFunction(ignore_delete=True)
+        mf.reset()
+        mf.add(_kv((1,), 1, RowKind.DELETE, (10,)))
+        mf.add(_kv((1,), 2, RowKind.UPDATE_BEFORE, (20,)))
+        self.assertIsNone(mf.get_result())
+
+    def test_update_after_accepted_as_first(self):
+        mf = FirstRowMergeFunction()
+        mf.reset()
+        mf.add(_kv((1,), 1, RowKind.UPDATE_AFTER, (10,)))
+        result = mf.get_result()
+        self.assertIsNotNone(result)
+        self.assertEqual(_result_value(result), (10,))
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_partial_update_e2e.py 
b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
index af5f61e98f..9d2b81530b 100644
--- a/paimon-python/pypaimon/tests/test_partial_update_e2e.py
+++ b/paimon-python/pypaimon/tests/test_partial_update_e2e.py
@@ -21,9 +21,10 @@
 Each test creates a PK table with ``merge-engine`` set to a particular
 value, writes one or more batches, and reads back. Partial-update reads
 must merge non-null fields across batches; ``deduplicate`` must keep
-the latest row only; ``aggregation`` and ``first-row`` must raise
-``NotImplementedError`` (until they are ported), since silently
-treating them as deduplicate would corrupt the user's data.
+the latest row only; ``first-row`` must keep the earliest row;
+``aggregation`` must raise ``NotImplementedError`` (until it is
+ported), since silently treating it as deduplicate would corrupt the
+user's data.
 """
 
 import os
@@ -204,19 +205,17 @@ class PartialUpdateMergeEngineE2ETest(unittest.TestCase):
             rb.new_read().to_arrow(splits)
         self.assertIn('aggregation', str(cm.exception))
 
-    def test_first_row_engine_raises_not_implemented(self):
-        """Until ``first-row`` is ported, reading a first-row table must
-        raise rather than silently produce dedupe results."""
-        table = self._create_pk_table('first_row_unsupported',
+    def test_first_row_engine_keeps_first(self):
+        """The ``first-row`` engine must keep the earliest row per PK."""
+        table = self._create_pk_table('first_row_supported',
                                       merge_engine='first-row')
-        self._write(table, [{'id': 1, 'a': 'x', 'b': None, 'c': None}])
-        self._write(table, [{'id': 1, 'a': 'y', 'b': None, 'c': None}])
+        self._write(table, [{'id': 1, 'a': 'first', 'b': None, 'c': None}])
+        self._write(table, [{'id': 1, 'a': 'second', 'b': 'B', 'c': 'C'}])
 
-        rb = table.new_read_builder()
-        splits = rb.new_scan().plan().splits()
-        with self.assertRaises(NotImplementedError) as cm:
-            rb.new_read().to_arrow(splits)
-        self.assertIn('first-row', str(cm.exception))
+        self.assertEqual(
+            self._read(table),
+            [{'id': 1, 'a': 'first', 'b': None, 'c': None}],
+        )
 
     # -- partial-update + out-of-scope option combinations ---------------
     #

Reply via email to