This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6948f854bc [python] Python BTree index reader supports INT and BIGINT
(#7163)
6948f854bc is described below
commit 6948f854bcdd7a2eb1c4376836f99aa5fd80ac6b
Author: Jingsong Lee <[email protected]>
AuthorDate: Sun Feb 1 15:03:44 2026 +0800
[python] Python BTree index reader supports INT and BIGINT (#7163)
---
.../test/java/org/apache/paimon/JavaPyE2ETest.java | 48 ++++++---
.../pypaimon/globalindex/btree/block_reader.py | 2 +-
.../globalindex/btree/btree_index_reader.py | 81 +++------------
.../pypaimon/globalindex/btree/key_serializer.py | 114 +++------------------
.../pypaimon/globalindex/btree/sst_file_reader.py | 16 +--
.../pypaimon/globalindex/global_index_evaluator.py | 28 +++--
.../globalindex/global_index_scan_builder.py | 13 +--
.../pypaimon/tests/e2e/java_py_read_write_test.py | 26 +++--
8 files changed, 102 insertions(+), 226 deletions(-)
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index 238f4c08e4..c397cef388 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -411,13 +411,37 @@ public class JavaPyE2ETest {
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testBtreeIndexWrite() throws Exception {
+ testBtreeIndexWriteString();
+ testBtreeIndexWriteInt();
+ testBtreeIndexWriteBigInt();
+ }
+
+ private void testBtreeIndexWriteString() throws Exception {
+ testBtreeIndexWriteGeneric(
+ DataTypes.STRING(),
+ "test_btree_index_string",
+ BinaryString.fromString("k1"),
+ BinaryString.fromString("k2"),
+ BinaryString.fromString("k3"));
+ }
+
+ private void testBtreeIndexWriteInt() throws Exception {
+ testBtreeIndexWriteGeneric(DataTypes.INT(), "test_btree_index_int",
100, 200, 300);
+ }
+
+ private void testBtreeIndexWriteBigInt() throws Exception {
+ testBtreeIndexWriteGeneric(
+ DataTypes.BIGINT(), "test_btree_index_bigint", 1000L, 2000L,
3000L);
+ }
+
+ private <T> void testBtreeIndexWriteGeneric(
+ DataType keyType, String tableName, Object key1, Object key2,
Object key3)
+ throws Exception {
// create table
RowType rowType =
- RowType.of(
- new DataType[] {DataTypes.STRING(),
DataTypes.STRING()},
- new String[] {"k", "v"});
+ RowType.of(new DataType[] {keyType, DataTypes.STRING()}, new
String[] {"k", "v"});
Options options = new Options();
- Path tablePath = new Path(warehouse.toString() +
"/default.db/test_btree_index");
+ Path tablePath = new Path(warehouse.toString() + "/default.db/" +
tableName);
options.set(PATH, tablePath.toString());
options.set(ROW_TRACKING_ENABLED, true);
options.set(DATA_EVOLUTION_ENABLED, true);
@@ -442,12 +466,9 @@ public class JavaPyE2ETest {
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
try (BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit()) {
- write.write(
- GenericRow.of(BinaryString.fromString("k1"),
BinaryString.fromString("v1")));
- write.write(
- GenericRow.of(BinaryString.fromString("k2"),
BinaryString.fromString("v2")));
- write.write(
- GenericRow.of(BinaryString.fromString("k3"),
BinaryString.fromString("v3")));
+ write.write(GenericRow.of(key1, BinaryString.fromString("v1")));
+ write.write(GenericRow.of(key2, BinaryString.fromString("v2")));
+ write.write(GenericRow.of(key3, BinaryString.fromString("v3")));
commit.commit(write.prepareCommit());
}
@@ -468,14 +489,13 @@ public class JavaPyE2ETest {
// read index
PredicateBuilder predicateBuilder = new
PredicateBuilder(table.rowType());
ReadBuilder readBuilder =
- table.newReadBuilder()
- .withFilter(predicateBuilder.equal(0,
BinaryString.fromString("k2")));
+ table.newReadBuilder().withFilter(predicateBuilder.equal(0,
key2));
List<String> result = new ArrayList<>();
readBuilder
.newRead()
.createReader(readBuilder.newScan().plan())
- .forEachRemaining(r -> result.add(r.getString(0) + ":" +
r.getString(1)));
- assertThat(result).containsOnly("k2:v2");
+ .forEachRemaining(r -> result.add(r.getString(1).toString()));
+ assertThat(result).containsOnly("v2");
}
// Helper method from TableTestBase
diff --git a/paimon-python/pypaimon/globalindex/btree/block_reader.py
b/paimon-python/pypaimon/globalindex/btree/block_reader.py
index f0091d0231..cff6c99794 100644
--- a/paimon-python/pypaimon/globalindex/btree/block_reader.py
+++ b/paimon-python/pypaimon/globalindex/btree/block_reader.py
@@ -223,7 +223,7 @@ class BlockIterator:
self.input.set_position(self.reader.seek_to(mid))
mid_entry = self.read_entry()
- compare = self.reader.comparator(mid_entry.key, target_key) if
self.reader.comparator else -1
+ compare = self.reader.comparator(mid_entry.key, target_key)
if compare == 0:
self.polled = mid_entry
diff --git a/paimon-python/pypaimon/globalindex/btree/btree_index_reader.py
b/paimon-python/pypaimon/globalindex/btree/btree_index_reader.py
index 83d6194d0d..f5172b167a 100644
--- a/paimon-python/pypaimon/globalindex/btree/btree_index_reader.py
+++ b/paimon-python/pypaimon/globalindex/btree/btree_index_reader.py
@@ -53,56 +53,6 @@ def _deserialize_row_ids(data: bytes) -> List[int]:
return row_ids
-def _compare_bytes(a: bytes, b: bytes) -> int:
- """
- Compare two byte arrays.
-
- Args:
- a: First byte array
- b: Second byte array
-
- Returns:
- -1 if a < b, 0 if a == b, 1 if a > b
- """
- if a < b:
- return -1
- elif a > b:
- return 1
- return 0
-
-
-def _is_in_range_bytes(
- key_bytes: bytes,
- from_bytes: bytes,
- to_bytes: bytes,
- from_inclusive: bool,
- to_inclusive: bool
-) -> bool:
- """
- Check if a key (as bytes) falls within the specified range.
-
- Args:
- key_bytes: The key bytes to check
- from_bytes: Lower bound bytes
- to_bytes: Upper bound bytes
- from_inclusive: Whether lower bound is inclusive
- to_inclusive: Whether upper bound is inclusive
-
- Returns:
- True if key is in range, False otherwise
- """
- if not from_inclusive and _compare_bytes(key_bytes, from_bytes) == 0:
- return False
-
- cmp_to = _compare_bytes(key_bytes, to_bytes)
- if cmp_to > 0:
- return False
- if not to_inclusive and cmp_to == 0:
- return False
-
- return True
-
-
class BTreeIndexReader(GlobalIndexReader):
"""
The GlobalIndexReader implementation for btree index.
@@ -225,25 +175,21 @@ class BTreeIndexReader(GlobalIndexReader):
) -> RoaringBitmap64:
"""
Range query on underlying SST File.
-
+
Args:
from_key: Lower bound key
to_key: Upper bound key
from_inclusive: Whether to include lower bound
to_inclusive: Whether to include upper bound
-
+
Returns:
RoaringBitmap64 containing all qualified row IDs
"""
result = RoaringBitmap64()
-
- # Use SST reader to efficiently scan the data blocks
- serialized_from = self.key_serializer.serialize(from_key)
- serialized_to = self.key_serializer.serialize(to_key)
# Create iterator and seek to start key
file_iter = self.reader.create_iterator()
- file_iter.seek_to(serialized_from)
+ file_iter.seek_to(self.key_serializer.serialize(from_key))
# Iterate through data blocks
while True:
@@ -260,15 +206,22 @@ class BTreeIndexReader(GlobalIndexReader):
key_bytes = entry.key
value_bytes = entry.value
- # Check if key is within range using byte comparison
- if _is_in_range_bytes(key_bytes, serialized_from,
serialized_to, from_inclusive, to_inclusive):
- row_ids = _deserialize_row_ids(value_bytes)
- for row_id in row_ids:
- result.add(row_id)
- elif _compare_bytes(key_bytes, serialized_to) > 0:
- # Key is beyond the range, stop processing
+ key = self.key_serializer.deserialize(key_bytes)
+
+ # Skip if key equals from_key and from_inclusive is False
+ if not from_inclusive and self.comparator(key, from_key) == 0:
+ continue
+
+ # Check if key is beyond the range
+ difference = self.comparator(key, to_key)
+ if difference > 0 or (not to_inclusive and difference == 0):
return result
+ # Add all row IDs for this key
+ row_ids = _deserialize_row_ids(value_bytes)
+ for row_id in row_ids:
+ result.add(row_id)
+
return result
def _is_in_range(
diff --git a/paimon-python/pypaimon/globalindex/btree/key_serializer.py
b/paimon-python/pypaimon/globalindex/btree/key_serializer.py
index 79cb32e5e6..f303b2634e 100644
--- a/paimon-python/pypaimon/globalindex/btree/key_serializer.py
+++ b/paimon-python/pypaimon/globalindex/btree/key_serializer.py
@@ -22,6 +22,9 @@ from abc import ABC, abstractmethod
from typing import Callable
import struct
+from pypaimon.schema.data_types import DataType
+from pypaimon.schema.data_types import AtomicType
+
class KeySerializer(ABC):
"""
@@ -85,11 +88,11 @@ class LongSerializer(KeySerializer):
def serialize(self, key: object) -> bytes:
"""Serialize a long key to bytes."""
- return struct.pack('>q', int(key))
+ return struct.pack('<q', int(key))
def deserialize(self, data: bytes) -> object:
"""Deserialize bytes to a long key."""
- return struct.unpack('>q', data)[0]
+ return struct.unpack('<q', data)[0]
def create_comparator(self) -> Callable[[object, object], int]:
"""Create a comparator for long keys."""
@@ -109,11 +112,11 @@ class IntSerializer(KeySerializer):
def serialize(self, key: object) -> bytes:
"""Serialize an int key to bytes."""
- return struct.pack('>i', int(key))
+ return struct.pack('<i', int(key))
def deserialize(self, data: bytes) -> object:
"""Deserialize bytes to an int key."""
- return struct.unpack('>i', data)[0]
+ return struct.unpack('<i', data)[0]
def create_comparator(self) -> Callable[[object, object], int]:
"""Create a comparator for int keys."""
@@ -128,104 +131,15 @@ class IntSerializer(KeySerializer):
return compare
-class FloatSerializer(KeySerializer):
- """Serializer for FLOAT type."""
-
- def serialize(self, key: object) -> bytes:
- """Serialize a float key to bytes."""
- return struct.pack('>f', float(key))
-
- def deserialize(self, data: bytes) -> object:
- """Deserialize bytes to a float key."""
- return struct.unpack('>f', data)[0]
-
- def create_comparator(self) -> Callable[[object, object], int]:
- """Create a comparator for float keys."""
- def compare(a: object, b: object) -> int:
- float_a = float(a)
- float_b = float(b)
- if float_a < float_b:
- return -1
- elif float_a > float_b:
- return 1
- return 0
- return compare
-
-
-class DoubleSerializer(KeySerializer):
- """Serializer for DOUBLE type."""
-
- def serialize(self, key: object) -> bytes:
- """Serialize a double key to bytes."""
- return struct.pack('>d', float(key))
-
- def deserialize(self, data: bytes) -> object:
- """Deserialize bytes to a double key."""
- return struct.unpack('>d', data)[0]
-
- def create_comparator(self) -> Callable[[object, object], int]:
- """Create a comparator for double keys."""
- def compare(a: object, b: object) -> int:
- double_a = float(a)
- double_b = float(b)
- if double_a < double_b:
- return -1
- elif double_a > double_b:
- return 1
- return 0
- return compare
-
-
-class BooleanSerializer(KeySerializer):
- """Serializer for BOOLEAN type."""
-
- def serialize(self, key: object) -> bytes:
- """Serialize a boolean key to bytes."""
- return struct.pack('>B', 1 if key else 0)
-
- def deserialize(self, data: bytes) -> object:
- """Deserialize bytes to a boolean key."""
- return struct.unpack('>B', data)[0] == 1
-
- def create_comparator(self) -> Callable[[object, object], int]:
- """Create a comparator for boolean keys."""
- def compare(a: object, b: object) -> int:
- bool_a = bool(a)
- bool_b = bool(b)
- if bool_a < bool_b:
- return -1
- elif bool_a > bool_b:
- return 1
- return 0
- return compare
-
-
-def create_serializer(data_type: str) -> KeySerializer:
- """
- Factory method to create a KeySerializer based on data type.
-
- Args:
- data_type: String representation of the data type
-
- Returns:
- Appropriate KeySerializer instance
-
- Raises:
- ValueError: If the data type is not supported
- """
- data_type_lower = data_type.lower()
-
- if data_type_lower in ('string', 'varchar', 'char'):
+def create_serializer(data_type: DataType) -> KeySerializer:
+ if not isinstance(data_type, AtomicType):
+ raise ValueError(f"Key serializer only support AtomicType yet, meet
{data_type.__class__}")
+ type_name = data_type.type.upper()
+ if type_name in ('CHAR', 'VARCHAR', 'STRING'):
return StringSerializer()
- elif data_type_lower in ('bigint', 'long'):
+ elif type_name == 'BIGINT':
return LongSerializer()
- elif data_type_lower in ('int', 'integer'):
+ elif type_name == 'INT':
return IntSerializer()
- elif data_type_lower in ('float'):
- return FloatSerializer()
- elif data_type_lower in ('double'):
- return DoubleSerializer()
- elif data_type_lower in ('boolean', 'bool'):
- return BooleanSerializer()
else:
raise ValueError(f"DataType: {data_type} is not supported by btree
index now.")
diff --git a/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
b/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
index 2ac19d07dc..8b346deecc 100644
--- a/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
+++ b/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
@@ -124,9 +124,9 @@ class SstFileReader:
):
self.comparator = comparator
self.input_stream = input_stream
- self.index_block = self._read_block(index_block_handle, True)
+ self.index_block = self._read_block(index_block_handle)
- def _read_block(self, block_handle: BlockHandle, index: bool) ->
BlockReader:
+ def _read_block(self, block_handle: BlockHandle) -> BlockReader:
self.input_stream.seek(block_handle.offset)
# Read block data + 5 bytes trailer (1 byte compression type + 4 bytes
CRC32)
block_data = self.input_stream.read(block_handle.size + 5)
@@ -149,19 +149,11 @@ class SstFileReader:
if actual_crc32 != crc32_value:
raise ValueError(f"CRC32 mismatch: expected {crc32_value}, got
{actual_crc32}")
- return BlockReader.create(block_bytes, self._slice_comparator if index
else self.comparator)
+ return BlockReader.create(block_bytes, self.comparator)
- @staticmethod
- def _slice_comparator(a: bytes, b: bytes) -> int:
- if a < b:
- return -1
- elif a > b:
- return 1
- return 0
-
def create_iterator(self) -> SstFileIterator:
def read_block(block: BlockHandle) -> BlockReader:
- return self._read_block(block, False)
+ return self._read_block(block)
return SstFileIterator(
read_block,
diff --git a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
index dd0eb57235..46bb5cd67d 100644
--- a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
+++ b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
@@ -18,15 +18,13 @@
"""Global index evaluator for filtering data using global indexes."""
-from typing import Callable, Collection, Dict, List, Optional, TYPE_CHECKING
+from typing import Callable, Collection, Dict, List, Optional
from pypaimon.globalindex.global_index_reader import GlobalIndexReader,
FieldRef
from pypaimon.globalindex.global_index_result import GlobalIndexResult
-
-if TYPE_CHECKING:
- from pypaimon.common.predicate import Predicate
- from pypaimon.globalindex.vector_search import VectorSearch
- from pypaimon.schema.data_types import DataField
+from pypaimon.common.predicate import Predicate
+from pypaimon.globalindex.vector_search import VectorSearch
+from pypaimon.schema.data_types import DataField
class GlobalIndexEvaluator:
@@ -36,8 +34,8 @@ class GlobalIndexEvaluator:
def __init__(
self,
- fields: List['DataField'],
- readers_function: Callable[[int], Collection[GlobalIndexReader]]
+ fields: List[DataField],
+ readers_function: Callable[[DataField], Collection[GlobalIndexReader]]
):
self._fields = fields
self._field_by_name = {f.name: f for f in fields}
@@ -46,8 +44,8 @@ class GlobalIndexEvaluator:
def evaluate(
self,
- predicate: Optional['Predicate'],
- vector_search: Optional['VectorSearch']
+ predicate: Optional[Predicate],
+ vector_search: Optional[VectorSearch]
) -> Optional[GlobalIndexResult]:
compound_result: Optional[GlobalIndexResult] = None
@@ -64,7 +62,7 @@ class GlobalIndexEvaluator:
field_id = field.id
readers = self._index_readers_cache.get(field_id)
if readers is None:
- readers = self._readers_function(field_id)
+ readers = self._readers_function(field)
self._index_readers_cache[field_id] = readers
# If we have a compound result from predicates, use it to filter
vector search
@@ -87,7 +85,7 @@ class GlobalIndexEvaluator:
return compound_result
- def _visit_predicate(self, predicate: 'Predicate') ->
Optional[GlobalIndexResult]:
+ def _visit_predicate(self, predicate: Predicate) ->
Optional[GlobalIndexResult]:
"""Visit a predicate and return the index result."""
if predicate.method == 'and':
compound_result: Optional[GlobalIndexResult] = None
@@ -121,7 +119,7 @@ class GlobalIndexEvaluator:
# Leaf predicate
return self._visit_leaf_predicate(predicate)
- def _visit_leaf_predicate(self, predicate: 'Predicate') ->
Optional[GlobalIndexResult]:
+ def _visit_leaf_predicate(self, predicate: Predicate) ->
Optional[GlobalIndexResult]:
"""Visit a leaf predicate and return the index result."""
field = self._field_by_name.get(predicate.field)
if field is None:
@@ -130,7 +128,7 @@ class GlobalIndexEvaluator:
field_id = field.id
readers = self._index_readers_cache.get(field_id)
if readers is None:
- readers = self._readers_function(field_id)
+ readers = self._readers_function(field)
self._index_readers_cache[field_id] = readers
field_ref = FieldRef(predicate.index, predicate.field, str(field.type))
@@ -155,7 +153,7 @@ class GlobalIndexEvaluator:
def _visit_function(
self,
reader: GlobalIndexReader,
- predicate: 'Predicate',
+ predicate: Predicate,
field_ref: FieldRef
) -> Optional[GlobalIndexResult]:
"""Visit a predicate function with the given reader."""
diff --git a/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
b/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
index 3d449d1288..e14df8426b 100644
--- a/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
+++ b/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
@@ -25,6 +25,7 @@ from concurrent.futures import ThreadPoolExecutor,
as_completed
from pypaimon.globalindex import GlobalIndexIOMeta, GlobalIndexReader,
GlobalIndexEvaluator
from pypaimon.globalindex.range import Range
from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.schema.data_types import DataField
class GlobalIndexScanBuilder(ABC):
@@ -160,12 +161,12 @@ class RowRangeGlobalIndexScanner:
)
index_metas[field_id][index_type].append(io_meta)
- def readers_function(field_id: int) -> Collection[GlobalIndexReader]:
+ def readers_function(field: DataField) ->
Collection[GlobalIndexReader]:
readers = []
- if field_id not in index_metas:
+ if field.id not in index_metas:
return readers
- for index_type, io_metas in index_metas[field_id].items():
+ for index_type, io_metas in index_metas[field.id].items():
if index_type == 'faiss-vector-ann':
# Lazy import to avoid requiring faiss when not used
from pypaimon.globalindex.faiss import (
@@ -181,11 +182,11 @@ class RowRangeGlobalIndexScanner:
)
readers.append(reader)
if index_type == 'btree':
- from pypaimon.globalindex.btree import (BTreeIndexReader)
- from pypaimon.globalindex.btree.key_serializer import
(StringSerializer)
+ from pypaimon.globalindex.btree import BTreeIndexReader
+ from pypaimon.globalindex.btree.key_serializer import
create_serializer
for metadata in io_metas:
reader = BTreeIndexReader(
- key_serializer=StringSerializer(), # TODO create
serializer from type
+ key_serializer=create_serializer(field.type),
file_io=file_io,
index_path=index_path,
io_meta=metadata
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 86c2866676..b4cdca1dfa 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -321,28 +321,26 @@ class JavaPyReadWriteTest(unittest.TestCase):
self.assertEqual(expected, actual)
def test_read_btree_index_table(self):
- table = self.catalog.get_table('default.test_btree_index')
- read_builder: ReadBuilder = table.new_read_builder()
+ self._test_read_btree_index_generic("test_btree_index_string", "k2",
pa.string())
+ self._test_read_btree_index_generic("test_btree_index_int", 200,
pa.int32())
+ self._test_read_btree_index_generic("test_btree_index_bigint", 2000,
pa.int64())
- # read all
- table_read = read_builder.new_read()
- splits = read_builder.new_scan().plan().splits()
- actual = table_sort_by(table_read.to_arrow(splits), 'k')
- expected = pa.Table.from_pydict({
- 'k': ["k1", "k2", "k3"],
- 'v': ["v1", "v2", "v3"]
- })
- self.assertEqual(expected, actual)
+ def _test_read_btree_index_generic(self, table_name: str, k, k_type):
+ table = self.catalog.get_table('default.' + table_name)
+ read_builder: ReadBuilder = table.new_read_builder()
# read using index
predicate_builder = read_builder.new_predicate_builder()
- predicate = predicate_builder.equal('k', 'k2')
+ predicate = predicate_builder.equal('k', k)
read_builder.with_filter(predicate)
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_sort_by(table_read.to_arrow(splits), 'k')
expected = pa.Table.from_pydict({
- 'k': ["k2"],
+ 'k': [k],
'v': ["v2"]
- })
+ }, schema=pa.schema([
+ ("k", k_type),
+ ("v", pa.string())
+ ]))
self.assertEqual(expected, actual)