This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2080559eb8 [Python] Blob read supports with_shard (#6465)
2080559eb8 is described below
commit 2080559eb857f754691e89531300366399990084
Author: umi <[email protected]>
AuthorDate: Fri Oct 24 10:39:07 2025 +0800
[Python] Blob read supports with_shard (#6465)
---
.../pypaimon/read/scanner/full_starting_scanner.py | 62 +++++++++++++++++++++-
paimon-python/pypaimon/read/split_read.py | 6 ++-
paimon-python/pypaimon/tests/blob_table_test.py | 62 ++++++++++++++++++++++
.../pypaimon/tests/data_evolution_test.py | 52 ++++++++++++++++++
4 files changed, 179 insertions(+), 3 deletions(-)
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 36dba3bdd1..b3c18a7bb6 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -157,6 +157,66 @@ class FullStartingScanner(StartingScanner):
return filtered_partitioned_files, plan_start_row, plan_end_row
+ def _data_evolution_filter_by_shard(self, partitioned_files: defaultdict)
-> (defaultdict, int, int):
+ total_row = 0
+ first_row_id_set = set()
+ # Sort by file creation time to ensure consistent sharding
+ for key, file_entries in partitioned_files.items():
+ for entry in file_entries:
+ if entry.file.first_row_id is None:
+ total_row += entry.file.row_count
+ elif entry.file.first_row_id not in first_row_id_set:
+ first_row_id_set.add(entry.file.first_row_id)
+ total_row += entry.file.row_count
+
+ # Calculate number of rows this shard should process
+ # Last shard handles all remaining rows (handles non-divisible cases)
+ if self.idx_of_this_subtask == self.number_of_para_subtasks - 1:
+ num_row = total_row - total_row // self.number_of_para_subtasks *
self.idx_of_this_subtask
+ else:
+ num_row = total_row // self.number_of_para_subtasks
+ # Calculate start row and end row position for current shard in all
data
+ start_row = self.idx_of_this_subtask * (total_row //
self.number_of_para_subtasks)
+ end_row = start_row + num_row
+
+ plan_start_row = 0
+ plan_end_row = 0
+ entry_end_row = 0 # end row position of current file in all data
+ splits_start_row = 0
+ filtered_partitioned_files = defaultdict(list)
+ # Iterate through all file entries to find files that overlap with
current shard range
+ for key, file_entries in partitioned_files.items():
+ filtered_entries = []
+ first_row_id_set = set()
+ for entry in file_entries:
+ if entry.file.first_row_id is not None:
+ if entry.file.first_row_id in first_row_id_set:
+ filtered_entries.append(entry)
+ continue
+ else:
+ first_row_id_set.add(entry.file.first_row_id)
+ entry_begin_row = entry_end_row # Starting row position of
current file in all data
+ entry_end_row += entry.file.row_count # Update to row
position after current file
+
+ # If current file is completely after shard range, stop
iteration
+ if entry_begin_row >= end_row:
+ break
+ # If current file is completely before shard range, skip it
+ if entry_end_row <= start_row:
+ continue
+ if entry_begin_row <= start_row < entry_end_row:
+ splits_start_row = entry_begin_row
+ plan_start_row = start_row - entry_begin_row
+ # If shard end position is within current file, record
relative end position
+ if entry_begin_row < end_row <= entry_end_row:
+ plan_end_row = end_row - splits_start_row
+ # Add files that overlap with shard range to result
+ filtered_entries.append(entry)
+ if filtered_entries:
+ filtered_partitioned_files[key] = filtered_entries
+
+ return filtered_partitioned_files, plan_start_row, plan_end_row
+
def _compute_split_start_end_row(self, splits: List[Split],
plan_start_row, plan_end_row):
file_end_row = 0 # end row position of current file in all data
for split in splits:
@@ -356,7 +416,7 @@ class FullStartingScanner(StartingScanner):
partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
if self.idx_of_this_subtask is not None:
- partitioned_files, plan_start_row, plan_end_row =
self._append_only_filter_by_shard(partitioned_files)
+ partitioned_files, plan_start_row, plan_end_row =
self._data_evolution_filter_by_shard(partitioned_files)
def weight_func(file_list: List[DataFileMeta]) -> int:
return max(sum(f.file_size for f in file_list),
self.open_file_cost)
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 5d59f942bd..a744fbc4f0 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -362,8 +362,10 @@ class DataEvolutionSplitRead(SplitRead):
suppliers.append(
lambda files=need_merge_files:
self._create_union_reader(files)
)
-
- return ConcatBatchReader(suppliers)
+ if self.split.split_start_row is not None:
+ return ShardBatchReader(suppliers, self.split.split_start_row,
self.split.split_end_row)
+ else:
+ return ConcatBatchReader(suppliers)
def _split_by_row_id(self, files: List[DataFileMeta]) ->
List[List[DataFileMeta]]:
"""Split files by firstRowId for data evolution."""
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index d24b3f0d5a..fe987ea0a6 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1272,6 +1272,68 @@ class DataBlobWriterTest(unittest.TestCase):
print(f" - Total data size: {total_blob_size:,} bytes
({total_blob_size / (1024*1024*1024):.2f} GB)") # noqa: E501
print(" - All blob content verified as correct")
+ def test_data_blob_writer_with_shard(self):
+ """Test DataBlobWriter with mixed data types in blob column."""
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('type', pa.string()),
+ ('data', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.with_shard_test', schema, False)
+ table = self.catalog.get_table('test_db.with_shard_test')
+
+ # Use proper table API to create writer
+ write_builder = table.new_batch_write_builder()
+ blob_writer = write_builder.new_write()
+
+ # Test data with different types of blob content
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3, 4, 5],
+ 'type': ['text', 'json', 'binary', 'image', 'pdf'],
+ 'data': [
+ b'This is text content',
+ b'{"key": "value", "number": 42}',
+ b'\x00\x01\x02\x03\xff\xfe\xfd',
+ b'PNG_IMAGE_DATA_PLACEHOLDER',
+ b'%PDF-1.4\nPDF_CONTENT_PLACEHOLDER'
+ ]
+ }, schema=pa_schema)
+
+ # Write mixed data
+ total_rows = 0
+ for batch in test_data.to_batches():
+ blob_writer.write_arrow_batch(batch)
+ total_rows += batch.num_rows
+
+ # Test prepare commit
+ commit_messages = blob_writer.prepare_commit()
+ # Create commit and commit the data
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ blob_writer.close()
+
+ # Read data back using table API
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan().with_shard(1, 2)
+ table_read = read_builder.new_read()
+ splits = table_scan.plan().splits()
+ result = table_read.to_arrow(splits)
+
+ # Verify the data was read back correctly
+ self.assertEqual(result.num_rows, 3, "Should have 5 rows")
+ self.assertEqual(result.num_columns, 3, "Should have 3 columns")
+
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 90abd2f916..1d6402b327 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -84,6 +84,58 @@ class DataEvolutionTest(unittest.TestCase):
]))
self.assertEqual(actual_data, expect_data)
+ def test_with_shard(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int8()),
+ ('f1', pa.int16()),
+ ])
+ schema = Schema.from_pyarrow_schema(simple_pa_schema,
+ options={'row-tracking.enabled':
'true', 'data-evolution.enabled': 'true'})
+ self.catalog.create_table('default.test_with_shard', schema, False)
+ table = self.catalog.get_table('default.test_with_shard')
+
+ # write 1
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ expect_data = pa.Table.from_pydict({
+ 'f0': [-1, 2],
+ 'f1': [-1001, 1002]
+ }, schema=simple_pa_schema)
+ table_write.write_arrow(expect_data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # write 2
+ table_write = write_builder.new_write().with_write_type(['f0'])
+ table_commit = write_builder.new_commit()
+ data2 = pa.Table.from_pydict({
+ 'f0': [3, 4],
+ }, schema=pa.schema([
+ ('f0', pa.int8()),
+ ]))
+ table_write.write_arrow(data2)
+ cmts = table_write.prepare_commit()
+ cmts[0].new_files[0].first_row_id = 0
+ table_commit.commit(cmts)
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan().with_shard(0, 2)
+ table_read = read_builder.new_read()
+ splits = table_scan.plan().splits()
+ actual_data = table_read.to_arrow(splits)
+ expect_data = pa.Table.from_pydict({
+ 'f0': [3],
+ 'f1': [-1001]
+ }, schema=pa.schema([
+ ('f0', pa.int8()),
+ ('f1', pa.int16()),
+ ]))
+ self.assertEqual(actual_data, expect_data)
+
def test_multiple_appends(self):
simple_pa_schema = pa.schema([
('f0', pa.int32()),