This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c864884bc9 [python] Push limit down to the reader layer for append 
table (#8102)
c864884bc9 is described below

commit c864884bc92bab09e2b39bddaeab5e61483ba7ad
Author: umi <[email protected]>
AuthorDate: Wed Jun 3 21:17:20 2026 +0800

    [python] Push limit down to the reader layer for append table (#8102)
---
 .../pypaimon/read/reader/limited_record_reader.py  | 38 ++++++++++++++++
 paimon-python/pypaimon/read/split_read.py          | 29 ++++++++----
 paimon-python/pypaimon/read/table_read.py          |  2 +
 paimon-python/pypaimon/tests/blob_table_test.py    | 16 +++++++
 .../pypaimon/tests/test_limit_pushdown.py          | 52 +++++++++++++++++++++-
 5 files changed, 126 insertions(+), 11 deletions(-)

diff --git a/paimon-python/pypaimon/read/reader/limited_record_reader.py 
b/paimon-python/pypaimon/read/reader/limited_record_reader.py
index 74f2612ebd..f78221d3f8 100644
--- a/paimon-python/pypaimon/read/reader/limited_record_reader.py
+++ b/paimon-python/pypaimon/read/reader/limited_record_reader.py
@@ -26,6 +26,9 @@ reused on other reader chains.
 
 from typing import Optional
 
+from pyarrow import RecordBatch
+
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
 from pypaimon.read.reader.iface.record_iterator import RecordIterator
 from pypaimon.read.reader.iface.record_reader import RecordReader
 
@@ -68,3 +71,38 @@ class _LimitedRecordIterator(RecordIterator):
             return None
         self._limiter.count += 1
         return row
+
+
+class LimitedRecordBatchReader(RecordBatchReader):
+    """Stop emitting rows once ``limit`` rows have been delivered.
+
+    Unlike ``LimitedRecordReader`` (which inherits ``RecordReader``),
+    this class inherits ``RecordBatchReader`` so that the
+    ``isinstance(..., RecordBatchReader)`` gate in TableRead picks the
+    arrow-batch code path.
+    """
+
+    def __init__(self, inner: RecordBatchReader, limit: int):
+        if limit < 0:
+            raise ValueError("limit must be non-negative, got %d" % limit)
+        self._inner = inner
+        self._limit = limit
+        self.count = 0
+        self.file_io = inner.file_io
+        self.blob_field_indices = inner.blob_field_indices
+        self.vector_field_indices = inner.vector_field_indices
+
+    def read_arrow_batch(self) -> Optional[RecordBatch]:
+        if self.count >= self._limit:
+            return None
+        batch = self._inner.read_arrow_batch()
+        if batch is None:
+            return None
+        remaining = self._limit - self.count
+        if batch.num_rows > remaining:
+            batch = batch.slice(0, remaining)
+        self.count += batch.num_rows
+        return batch
+
+    def close(self) -> None:
+        self._inner.close()
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 643c40c86a..7d20359dfc 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -42,6 +42,7 @@ from pypaimon.read.reader.filter_record_reader import 
FilterRecordReader
 from pypaimon.read.reader.format_avro_reader import FormatAvroReader
 from pypaimon.read.reader.blob_descriptor_convert_reader import 
BlobDescriptorConvertReader
 from pypaimon.read.reader.filter_record_batch_reader import 
FilterRecordBatchReader
+from pypaimon.read.reader.limited_record_reader import 
LimitedRecordBatchReader, LimitedRecordReader
 from pypaimon.read.reader.row_range_filter_record_reader import 
RowIdFilterRecordBatchReader
 from pypaimon.read.reader.format_blob_reader import FormatBlobReader
 from pypaimon.read.reader.format_lance_reader import FormatLanceReader
@@ -110,7 +111,8 @@ class SplitRead(ABC):
             read_type: List[DataField],
             split: Split,
             row_tracking_enabled: bool,
-            nested_name_paths: Optional[List[List[str]]] = None):
+            nested_name_paths: Optional[List[List[str]]] = None,
+            limit: Optional[int] = None):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
@@ -120,6 +122,7 @@ class SplitRead(ABC):
         self.row_tracking_enabled = row_tracking_enabled
         self.value_arity = len(read_type)
         self.nested_name_paths = nested_name_paths
+        self.limit = limit
         # Snapshot the raw value-side schema before _create_key_value_fields
         # wraps it, so MergeFileSplitRead can hand per-value-field nullable
         # flags to merge functions that mirror Java's NOT-NULL check.
@@ -145,8 +148,8 @@ class SplitRead(ABC):
         # the space FilterRecordReader actually evaluates against.
         read_type_names = {f.name for f in read_type}
         if (
-            self.predicate is not None
-            and _get_all_fields(self.predicate).issubset(read_type_names)
+                self.predicate is not None
+                and _get_all_fields(self.predicate).issubset(read_type_names)
         ):
             self.predicate_for_reader = rewrite_predicate_indices(
                 self.predicate, read_type
@@ -628,9 +631,14 @@ class RawFileSplitRead(SplitRead):
             vector_field_indices=_vector_field_indices(self.read_fields))
         # if the table is appendonly table, we don't need extra filter, all 
predicates has pushed down
         if self.table.is_primary_key_table and self.predicate_for_reader:
-            return FilterRecordReader(concat_reader, self.predicate_for_reader)
+            reader = FilterRecordReader(concat_reader, 
self.predicate_for_reader)
+            if self.limit is not None:
+                reader = LimitedRecordReader(reader, self.limit)
         else:
-            return concat_reader
+            reader = concat_reader
+            if self.limit is not None:
+                reader = LimitedRecordBatchReader(reader, self.limit)
+        return reader
 
     def _get_all_data_fields(self):
         if self.row_tracking_enabled:
@@ -658,9 +666,9 @@ class MergeFileSplitRead(SplitRead):
             split=split,
             row_tracking_enabled=row_tracking_enabled,
             nested_name_paths=None,
+            limit=limit,
         )
         self.outer_extract_name_paths = outer_extract_name_paths
-        self.limit = limit
         # Built once per split-read (value_fields and options are constant
         # for the object's life), not per section. ``None`` when
         # ``sequence.field`` is unset, in which case the heap falls back to
@@ -765,8 +773,6 @@ class MergeFileSplitRead(SplitRead):
                 blob_field_indices=_blob_field_indices(inner_value_fields),
                 vector_field_indices=_vector_field_indices(inner_value_fields))
         if self.limit is not None:
-            from pypaimon.read.reader.limited_record_reader import \
-                LimitedRecordReader
             reader = LimitedRecordReader(reader, self.limit)
         return reader
 
@@ -783,7 +789,8 @@ class DataEvolutionSplitRead(SplitRead):
             read_type: List[DataField],
             split: Split,
             row_tracking_enabled: bool,
-            nested_name_paths: Optional[List[List[str]]] = None):
+            nested_name_paths: Optional[List[List[str]]] = None,
+            limit: Optional[int] = None):
         self.row_ranges = None
         actual_split = split
         if isinstance(split, IndexedSplit):
@@ -792,6 +799,7 @@ class DataEvolutionSplitRead(SplitRead):
         super().__init__(
             table, predicate, read_type, actual_split, row_tracking_enabled,
             nested_name_paths=nested_name_paths,
+            limit=limit,
         )
 
     def _push_down_predicate(self) -> Optional[Predicate]:
@@ -835,6 +843,9 @@ class DataEvolutionSplitRead(SplitRead):
                 and CoreOptions.blob_descriptor_fields(self.table.options)):
             reader = BlobDescriptorConvertReader(reader, self.table)
 
+        if self.limit is not None:
+            reader = LimitedRecordBatchReader(reader, self.limit)
+
         return reader
 
     def _split_by_row_id(self, files: List[DataFileMeta]) -> 
List[List[DataFileMeta]]:
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index 7c717df24d..826b2b4024 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -580,6 +580,7 @@ class TableRead:
                 split=split,
                 row_tracking_enabled=True,
                 nested_name_paths=self.nested_name_paths,
+                limit=self.limit,
             )
         else:
             return RawFileSplitRead(
@@ -589,6 +590,7 @@ class TableRead:
                 split=split,
                 row_tracking_enabled=self.table.options.row_tracking_enabled(),
                 nested_name_paths=self.nested_name_paths,
+                limit=self.limit,
             )
 
     def _widen_to_top_level_for_merge(self) -> List[DataField]:
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 295182d43e..b6a12fe973 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -3369,6 +3369,22 @@ class GetBlobTest(unittest.TestCase):
         self.assertEqual(results[1], (2, b'img_data_2'))
         self.assertEqual(results[2], (3, b'img_data_3'))
 
+    def test_get_blob_access_with_limit(self):
+        read_builder = self.table.new_read_builder().with_limit(2)
+        splits = read_builder.new_scan().plan().splits()
+        read = read_builder.new_read()
+
+        results = []
+        for row in read.to_iterator(splits):
+            blob = row.get_blob(2)
+            self.assertIsNotNone(blob)
+            results.append((row.get_field(0), blob.to_data()))
+
+        self.assertEqual(len(results), 2)
+        for row_id, data in results:
+            self.assertIn(row_id, (1, 2, 3))
+            self.assertIn(data, (b'img_data_1', b'img_data_2', b'img_data_3'))
+
     def test_get_blob_streaming(self):
         read_builder = self.table.new_read_builder()
         splits = read_builder.new_scan().plan().splits()
diff --git a/paimon-python/pypaimon/tests/test_limit_pushdown.py 
b/paimon-python/pypaimon/tests/test_limit_pushdown.py
index 2e717c28c7..a4b51ee3c0 100644
--- a/paimon-python/pypaimon/tests/test_limit_pushdown.py
+++ b/paimon-python/pypaimon/tests/test_limit_pushdown.py
@@ -110,8 +110,8 @@ class LimitPushdownTest(unittest.TestCase):
         exactly 3 rows — even though each partition split has 5 rows."""
         table = self._create_ao_table('limit_ao_within_split')
         self._write_ao_partitions(table, [
-            ('p1', list(range(5))),       # 5 rows
-            ('p2', list(range(5, 10))),   # 5 rows
+            ('p1', list(range(5))),  # 5 rows
+            ('p2', list(range(5, 10))),  # 5 rows
         ])
         rb = table.new_read_builder().with_limit(3)
         result = rb.new_read().to_arrow(rb.new_scan().plan().splits())
@@ -207,6 +207,54 @@ class LimitPushdownTest(unittest.TestCase):
         rows = list(it)
         self.assertEqual(len(rows), 7)
 
+    # ---- SplitRead-level limit pushdown verification ---------------------
+
+    def test_append_only_split_read_creates_limited_batch_reader(self):
+        """Verify that RawFileSplitRead.create_reader() returns a
+        LimitedRecordBatchReader (inherits RecordBatchReader) when limit
+        is set, so the arrow-batch read path is taken."""
+        from pypaimon.read.reader.iface.record_batch_reader import 
RecordBatchReader
+        from pypaimon.read.reader.limited_record_reader import 
LimitedRecordBatchReader
+
+        table = self._create_ao_table('limit_ao_split_read')
+        self._write_ao_partitions(table, [('p1', list(range(10)))])
+        rb = table.new_read_builder().with_limit(3)
+        table_read = rb.new_read()
+        splits = rb.new_scan().plan().splits()
+        self.assertGreater(len(splits), 0)
+        for split in splits:
+            split_read = table_read._create_split_read(split)
+            self.assertEqual(split_read.limit, 3)
+            reader = split_read.create_reader()
+            self.assertIsInstance(reader, LimitedRecordBatchReader,
+                                  "RawFileSplitRead.create_reader() should 
wrap with LimitedRecordBatchReader")
+            self.assertIsInstance(reader, RecordBatchReader,
+                                  "LimitedRecordBatchReader should be a 
RecordBatchReader")
+            reader.close()
+
+    def test_append_only_split_read_limit_truncates_within_split(self):
+        """Directly read from a single split's reader with limit and verify
+        the reader itself stops at the limit boundary, not relying on
+        TableRead-level truncation."""
+        table = self._create_ao_table('limit_ao_split_truncate')
+        self._write_ao_partitions(table, [('p1', list(range(20)))])
+        rb = table.new_read_builder().with_limit(5)
+        table_read = rb.new_read()
+        splits = rb.new_scan().plan().splits()
+        self.assertEqual(len(splits), 1)
+        split_read = table_read._create_split_read(splits[0])
+        reader = split_read.create_reader()
+        # Drain the reader directly, bypassing TableRead-level control
+        total_rows = 0
+        while True:
+            batch = reader.read_arrow_batch()
+            if batch is None:
+                break
+            total_rows += batch.num_rows
+        reader.close()
+        self.assertEqual(total_rows, 5,
+                         "SplitRead-level reader should stop at limit=5, got 
%d" % total_rows)
+
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to