This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4955ef0dcd [python] support read.batch-size and fix default value
(#7051)
4955ef0dcd is described below
commit 4955ef0dcdaec969d7630ed6884c638ce96b6d0a
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Jan 15 15:21:49 2026 +0800
[python] support read.batch-size and fix default value (#7051)
---
.../pypaimon/common/options/core_options.py | 10 +++
.../pypaimon/read/reader/concat_batch_reader.py | 2 +-
.../pypaimon/read/reader/format_avro_reader.py | 2 +-
.../pypaimon/read/reader/format_blob_reader.py | 2 +-
.../pypaimon/read/reader/format_lance_reader.py | 2 +-
.../pypaimon/read/reader/format_pyarrow_reader.py | 2 +-
paimon-python/pypaimon/read/split_read.py | 16 +++--
paimon-python/pypaimon/tests/reader_base_test.py | 71 ++++++++++++++++++++++
8 files changed, 96 insertions(+), 11 deletions(-)
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index be64278e76..43c7de8ff3 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -411,6 +411,13 @@ class CoreOptions:
.with_description("Whether to L2 normalize vectors for cosine
similarity.")
)
+ READ_BATCH_SIZE: ConfigOption[int] = (
+ ConfigOptions.key("read.batch-size")
+ .int_type()
+ .default_value(1024)
+ .with_description("Read batch size for any file format if it
supports.")
+ )
+
def __init__(self, options: Options):
self.options = options
@@ -586,3 +593,6 @@ class CoreOptions:
def vector_normalize(self, default=None):
return self.options.get(CoreOptions.VECTOR_NORMALIZE, default)
+
+ def read_batch_size(self, default=None) -> int:
+ return self.options.get(CoreOptions.READ_BATCH_SIZE, default or 1024)
diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index 3ce9db6f5e..b15a37f79f 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -60,7 +60,7 @@ class MergeAllBatchReader(RecordBatchReader):
into a single batch for processing.
"""
- def __init__(self, reader_suppliers: List[Callable], batch_size: int =
4096):
+ def __init__(self, reader_suppliers: List[Callable], batch_size: int =
1024):
self.reader_suppliers = reader_suppliers
self.merged_batch: Optional[RecordBatch] = None
self.reader = None
diff --git a/paimon-python/pypaimon/read/reader/format_avro_reader.py
b/paimon-python/pypaimon/read/reader/format_avro_reader.py
index 4114d8e93b..5dd2738944 100644
--- a/paimon-python/pypaimon/read/reader/format_avro_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_avro_reader.py
@@ -35,7 +35,7 @@ class FormatAvroReader(RecordBatchReader):
"""
def __init__(self, file_io: FileIO, file_path: str, read_fields:
List[str], full_fields: List[DataField],
- push_down_predicate: Any, batch_size: int = 4096):
+ push_down_predicate: Any, batch_size: int = 1024):
file_path_for_io = file_io.to_filesystem_path(file_path)
self._file = file_io.filesystem.open_input_file(file_path_for_io)
self._avro_reader = fastavro.reader(self._file)
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index ecd740de4d..81bcae8a74 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -35,7 +35,7 @@ class FormatBlobReader(RecordBatchReader):
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
full_fields: List[DataField], push_down_predicate: Any,
blob_as_descriptor: bool,
- batch_size: int = 4096):
+ batch_size: int = 1024):
self._file_io = file_io
self._file_path = file_path
self._push_down_predicate = push_down_predicate
diff --git a/paimon-python/pypaimon/read/reader/format_lance_reader.py
b/paimon-python/pypaimon/read/reader/format_lance_reader.py
index a6b3277167..4be30a6f5d 100644
--- a/paimon-python/pypaimon/read/reader/format_lance_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_lance_reader.py
@@ -33,7 +33,7 @@ class FormatLanceReader(RecordBatchReader):
"""
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
- push_down_predicate: Any, batch_size: int = 4096):
+ push_down_predicate: Any, batch_size: int = 1024):
"""Initialize Lance reader."""
import lance
diff --git a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
index ed560d14a4..699ff48477 100644
--- a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
@@ -33,7 +33,7 @@ class FormatPyArrowReader(RecordBatchReader):
"""
def __init__(self, file_io: FileIO, file_format: str, file_path: str,
read_fields: List[str],
- push_down_predicate: Any, batch_size: int = 4096):
+ push_down_predicate: Any, batch_size: int = 1024):
file_path_for_pyarrow = file_io.to_filesystem_path(file_path)
self.dataset = ds.dataset(file_path_for_pyarrow, format=file_format,
filesystem=file_io.filesystem)
self.read_fields = read_fields
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 47edf63d9a..eab279cf9c 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -110,20 +110,23 @@ class SplitRead(ABC):
_, extension = os.path.splitext(file_path)
file_format = extension[1:]
+ batch_size = self.table.options.read_batch_size()
+
format_reader: RecordBatchReader
if file_format == CoreOptions.FILE_FORMAT_AVRO:
format_reader = FormatAvroReader(self.table.file_io, file_path,
read_file_fields,
- self.read_fields,
read_arrow_predicate)
+ self.read_fields,
read_arrow_predicate, batch_size=batch_size)
elif file_format == CoreOptions.FILE_FORMAT_BLOB:
blob_as_descriptor =
CoreOptions.blob_as_descriptor(self.table.options)
format_reader = FormatBlobReader(self.table.file_io, file_path,
read_file_fields,
- self.read_fields,
read_arrow_predicate, blob_as_descriptor)
+ self.read_fields,
read_arrow_predicate, blob_as_descriptor,
+ batch_size=batch_size)
elif file_format == CoreOptions.FILE_FORMAT_LANCE:
format_reader = FormatLanceReader(self.table.file_io, file_path,
read_file_fields,
- read_arrow_predicate)
+ read_arrow_predicate,
batch_size=batch_size)
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format ==
CoreOptions.FILE_FORMAT_ORC:
format_reader = FormatPyArrowReader(self.table.file_io,
file_format, file_path,
- read_file_fields,
read_arrow_predicate)
+ read_file_fields,
read_arrow_predicate, batch_size=batch_size)
else:
raise ValueError(f"Unexpected file format: {file_format}")
@@ -546,19 +549,20 @@ class DataEvolutionSplitRead(SplitRead):
read_field_names = self._remove_partition_fields(read_fields)
table_fields = self.read_fields
self.read_fields = read_fields # create reader based on
read_fields
+ batch_size = self.table.options.read_batch_size()
# Create reader for this bunch
if len(bunch.files()) == 1:
suppliers = [lambda r=self._create_file_reader(
bunch.files()[0], read_field_names
): r]
- file_record_readers[i] = MergeAllBatchReader(suppliers)
+ file_record_readers[i] = MergeAllBatchReader(suppliers,
batch_size=batch_size)
else:
# Create concatenated reader for multiple files
suppliers = [
partial(self._create_file_reader, file=file,
read_fields=read_field_names) for file in
bunch.files()
]
- file_record_readers[i] = MergeAllBatchReader(suppliers)
+ file_record_readers[i] = MergeAllBatchReader(suppliers,
batch_size=batch_size)
self.read_fields = table_fields
# Validate that all required fields are found
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py
b/paimon-python/pypaimon/tests/reader_base_test.py
index a7b2abd516..3d9ed7f874 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -1320,3 +1320,74 @@ class ReaderBasicTest(unittest.TestCase):
# Verify the error message contains the expected text
self.assertIn("Table Type", str(context.exception))
+
+ def test_read_batch_size_config(self):
+ from pypaimon.common.options.core_options import CoreOptions
+ from pypaimon.common.options import Options
+
+ options = Options({})
+ core_options = CoreOptions(options)
+ self.assertEqual(core_options.read_batch_size(), 1024,
+ "Default read_batch_size should be 1024")
+
+ options = Options({CoreOptions.READ_BATCH_SIZE.key(): '512'})
+ core_options = CoreOptions(options)
+ self.assertEqual(core_options.read_batch_size(), 512,
+ "read_batch_size should read from options")
+
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('value', pa.string())
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={CoreOptions.READ_BATCH_SIZE.key(): '10'}
+ )
+ self.catalog.create_table('default.test_read_batch_size', schema,
False)
+ table = self.catalog.get_table('default.test_read_batch_size')
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'id': list(range(50)),
+ 'value': [f'value_{i}' for i in range(50)]
+ }, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ self.assertEqual(table.options.read_batch_size(), 10,
+ "Table should have read_batch_size=10 from options")
+
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+
+ if splits:
+ # Use _create_split_read to create reader
+ split_read = table_read._create_split_read(splits[0])
+ reader = split_read.create_reader()
+ batch_count = 0
+ total_rows = 0
+ max_batch_size = 0
+
+ try:
+ while True:
+ batch = reader.read_arrow_batch()
+ if batch is None:
+ break
+ batch_count += 1
+ batch_rows = batch.num_rows
+ total_rows += batch_rows
+ max_batch_size = max(max_batch_size, batch_rows)
+ finally:
+ reader.close()
+
+ self.assertGreater(batch_count, 1,
+ f"With batch_size=10, should get multiple
batches, got {batch_count}")
+ self.assertEqual(total_rows, 50, "Should read all 50 rows")
+ self.assertLessEqual(max_batch_size, 20,
+ f"Max batch size should be close to
configured 10, got {max_batch_size}")