This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b557d427a [python] Fix BlockHandle varlen decoding in 
SstFileIterator.read_batch (#7597)
2b557d427a is described below

commit 2b557d427ad7bfcaf411f0dd89de3cc4e7f1e138
Author: Jiajia Li <[email protected]>
AuthorDate: Tue Apr 7 09:58:38 2026 +0800

    [python] Fix BlockHandle varlen decoding in SstFileIterator.read_batch 
(#7597)
    
    read_batch() incorrectly used fixed-width struct.unpack to decode
    BlockHandle, while the SST format uses variable-length encoding. Extract
    shared _parse_block_handle() to ensure both seek_to() and read_batch()
    use consistent varlen decoding.
---
 .../pypaimon/globalindex/btree/sst_file_reader.py  |  33 ++-
 .../pypaimon/tests/e2e/java_py_read_write_test.py  |   7 +
 .../pypaimon/tests/sst_file_iterator_test.py       | 221 +++++++++++++++++++++
 3 files changed, 242 insertions(+), 19 deletions(-)

diff --git a/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py 
b/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
index 9d7f2eafc2..c7a28850ca 100644
--- a/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
+++ b/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
@@ -30,7 +30,7 @@ import zlib
 from typing import Optional, Callable
 from typing import BinaryIO
 
-from pypaimon.globalindex.btree.btree_file_footer import BlockHandle
+from pypaimon.globalindex.btree.block_handle import BlockHandle
 from pypaimon.globalindex.btree.block_entry import BlockEntry
 from pypaimon.globalindex.btree.block_reader import BlockReader, BlockIterator
 from pypaimon.globalindex.btree.memory_slice_input import MemorySliceInput
@@ -48,27 +48,28 @@ class SstFileIterator:
         self.index_iterator = index_block_iterator
         self.sought_data_block: Optional[BlockIterator] = None
     
+    @staticmethod
+    def _parse_block_handle(block_handle_bytes: bytes) -> BlockHandle:
+        handle_input = MemorySliceInput(block_handle_bytes)
+        return BlockHandle(
+            handle_input.read_var_len_long(),
+            handle_input.read_var_len_int()
+        )
+
     def seek_to(self, key: bytes) -> None:
         """
         Seek to the position of the record whose key is exactly equal to or
         greater than the specified key.
-        
+
         Args:
             key: The key to seek to
         """
         self.index_iterator.seek_to(key)
-        
+
         if self.index_iterator.has_next():
             index_entry: BlockEntry = self.index_iterator.__next__()
-            block_handle_bytes = index_entry.__getattribute__("value")
-            handle_input = MemorySliceInput(block_handle_bytes)
-
-            # Parse block handle
-            block_handle = BlockHandle(
-                handle_input.read_var_len_long(),
-                handle_input.read_var_len_int()
-            )
-            
+            block_handle = self._parse_block_handle(index_entry.value)
+
             # Create data block reader and seek
             data_block_reader = self.read_block(block_handle)
             self.sought_data_block = data_block_reader.iterator()
@@ -93,13 +94,7 @@ class SstFileIterator:
             return None
         
         index_entry = self.index_iterator.__next__()
-        block_handle_bytes = index_entry.value
-        
-        # Parse block handle
-        block_handle = BlockHandle(
-            struct.unpack('<Q', block_handle_bytes[0:8])[0],
-            struct.unpack('<I', block_handle_bytes[8:12])[0]
-        )
+        block_handle = self._parse_block_handle(index_entry.value)
         
         # Create data block reader
         data_block_reader = self.read_block(block_handle)
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py 
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 61cd48ca67..58805983c5 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -452,6 +452,13 @@ class JavaPyReadWriteTest(unittest.TestCase):
         })
         self.assertEqual(expected, actual)
 
+        # read is_not_null index (full scan across all data blocks)
+        read_builder.with_filter(predicate_builder.is_not_null('k'))
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        actual = table_read.to_arrow(splits)
+        self.assertEqual(len(actual), 2000)
+
     def _test_read_btree_index_null(self):
         table = self.catalog.get_table('default.test_btree_index_null')
 
diff --git a/paimon-python/pypaimon/tests/sst_file_iterator_test.py 
b/paimon-python/pypaimon/tests/sst_file_iterator_test.py
new file mode 100644
index 0000000000..c7c0a9b3a9
--- /dev/null
+++ b/paimon-python/pypaimon/tests/sst_file_iterator_test.py
@@ -0,0 +1,221 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""Tests for SstFileIterator BlockHandle varlen decoding."""
+
+import unittest
+from unittest.mock import MagicMock
+
+from pypaimon.globalindex.btree.block_handle import BlockHandle
+from pypaimon.globalindex.btree.block_entry import BlockEntry
+from pypaimon.globalindex.btree.memory_slice_input import MemorySliceInput
+from pypaimon.globalindex.btree.sst_file_reader import SstFileIterator
+
+
+def _encode_var_len(value):
+    result = bytearray()
+    while value > 0x7F:
+        result.append((value & 0x7F) | 0x80)
+        value >>= 7
+    result.append(value & 0x7F)
+    return bytes(result)
+
+
+def _encode_block_handle(offset, size):
+    return _encode_var_len(offset) + _encode_var_len(size)
+
+
+def _mock_block_iterator(entries):
+    """Mock a BlockIterator with has_next/next/seek_to over a list of 
BlockEntry."""
+    state = {'pos': 0}
+    entry_list = list(entries)
+
+    mock = MagicMock()
+    mock.has_next = lambda: state['pos'] < len(entry_list)
+
+    def next_entry(_self=None):
+        if state['pos'] >= len(entry_list):
+            raise StopIteration
+        entry = entry_list[state['pos']]
+        state['pos'] += 1
+        return entry
+    mock.__next__ = next_entry
+    mock.__iter__ = lambda _self=None: mock
+
+    def seek_to(target_key):
+        for i, entry in enumerate(entry_list):
+            if entry.key >= target_key:
+                state['pos'] = i
+                return entry.key == target_key
+        state['pos'] = len(entry_list)
+        return False
+    mock.seek_to = seek_to
+
+    return mock
+
+
+class SstFileIteratorTest(unittest.TestCase):
+
+    def _make_iterator(self, index_entries, data_blocks):
+        mock_index_entries = []
+        for key, handle in index_entries:
+            value = _encode_block_handle(handle.offset, handle.size)
+            mock_index_entries.append(BlockEntry(key, value))
+
+        index_iter = _mock_block_iterator(mock_index_entries)
+
+        def read_block(block_handle):
+            entries = data_blocks.get((block_handle.offset, block_handle.size))
+            if entries is None:
+                raise ValueError(
+                    "Unexpected BlockHandle(offset={}, size={})".format(
+                        block_handle.offset, block_handle.size))
+            reader = MagicMock()
+            reader.iterator = lambda e=entries: _mock_block_iterator(e)
+            return reader
+
+        return SstFileIterator(read_block, index_iter)
+
+    def test_read_batch_varlen_small_values(self):
+        handle = BlockHandle(100, 50)
+        data = [BlockEntry(b"k1", b"v1"), BlockEntry(b"k2", b"v2")]
+
+        it = self._make_iterator(
+            [(b"k2", handle)],
+            {(100, 50): data}
+        )
+
+        batch = it.read_batch()
+        self.assertIsNotNone(batch)
+        entries = [batch.__next__() for _ in range(2)]
+        self.assertEqual(len(entries), 2)
+        self.assertEqual(entries[0].key, b"k1")
+        self.assertEqual(entries[1].key, b"k2")
+        self.assertIsNone(it.read_batch())
+
+    def test_read_batch_varlen_large_offset(self):
+        handle = BlockHandle(300, 200)
+        data = [BlockEntry(b"a", b"1")]
+
+        it = self._make_iterator(
+            [(b"a", handle)],
+            {(300, 200): data}
+        )
+
+        batch = it.read_batch()
+        self.assertIsNotNone(batch)
+        entry = batch.__next__()
+        self.assertEqual(entry.key, b"a")
+
+    def test_read_batch_varlen_very_large_offset(self):
+        handle = BlockHandle(1000000, 65535)
+        data = [BlockEntry(b"big", b"val")]
+
+        it = self._make_iterator(
+            [(b"big", handle)],
+            {(1000000, 65535): data}
+        )
+
+        batch = it.read_batch()
+        self.assertIsNotNone(batch)
+        entry = batch.__next__()
+        self.assertEqual(entry.key, b"big")
+
+    def test_read_batch_multiple_blocks(self):
+        h1 = BlockHandle(0, 100)
+        h2 = BlockHandle(200, 150)
+        h3 = BlockHandle(500, 80)
+
+        it = self._make_iterator(
+            [(b"b", h1), (b"d", h2), (b"f", h3)],
+            {
+                (0, 100): [BlockEntry(b"a", b"1"), BlockEntry(b"b", b"2")],
+                (200, 150): [BlockEntry(b"c", b"3"), BlockEntry(b"d", b"4")],
+                (500, 80): [BlockEntry(b"e", b"5"), BlockEntry(b"f", b"6")],
+            }
+        )
+
+        all_entries = []
+        while True:
+            batch = it.read_batch()
+            if batch is None:
+                break
+            while batch.has_next():
+                all_entries.append(batch.__next__())
+
+        self.assertEqual(len(all_entries), 6)
+        keys = [e.key for e in all_entries]
+        self.assertEqual(keys, [b"a", b"b", b"c", b"d", b"e", b"f"])
+
+    def test_seek_then_read_batch_crosses_blocks(self):
+        h1 = BlockHandle(0, 100)
+        h2 = BlockHandle(256, 128)
+
+        it = self._make_iterator(
+            [(b"b", h1), (b"d", h2)],
+            {
+                (0, 100): [BlockEntry(b"a", b"1"), BlockEntry(b"b", b"2")],
+                (256, 128): [BlockEntry(b"c", b"3"), BlockEntry(b"d", b"4")],
+            }
+        )
+
+        it.seek_to(b"a")
+        self.assertIsNotNone(it.sought_data_block)
+
+        batch1 = it.read_batch()
+        self.assertIsNotNone(batch1)
+        self.assertEqual(batch1.__next__().key, b"a")
+
+        batch2 = it.read_batch()
+        self.assertIsNotNone(batch2)
+        entries2 = []
+        while batch2.has_next():
+            entries2.append(batch2.__next__())
+        self.assertEqual(len(entries2), 2)
+        self.assertEqual(entries2[0].key, b"c")
+        self.assertEqual(entries2[1].key, b"d")
+
+        self.assertIsNone(it.read_batch())
+
+    def test_read_batch_empty_index(self):
+        it = self._make_iterator([], {})
+        self.assertIsNone(it.read_batch())
+
+    def test_varlen_encoding_roundtrip(self):
+        test_cases = [
+            (0, 0),
+            (127, 127),
+            (128, 128),
+            (300, 200),
+            (16384, 255),
+            (1000000, 65535),
+            (2**31 - 1, 2**31 - 1),
+        ]
+        for offset, size in test_cases:
+            encoded = _encode_block_handle(offset, size)
+            inp = MemorySliceInput(encoded)
+            decoded_offset = inp.read_var_len_long()
+            decoded_size = inp.read_var_len_int()
+            self.assertEqual(decoded_offset, offset,
+                             "offset mismatch for ({}, {})".format(offset, 
size))
+            self.assertEqual(decoded_size, size,
+                             "size mismatch for ({}, {})".format(offset, size))
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to