This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4a71298bcb [python] introduce BlobConsumer mirroring Java module
(#8105)
4a71298bcb is described below
commit 4a71298bcbc4bec42ba404d114161b8440ea2a95
Author: Faiz <[email protected]>
AuthorDate: Wed Jun 3 19:42:35 2026 +0800
[python] introduce BlobConsumer mirroring Java module (#8105)
---
paimon-python/pypaimon/table/row/blob.py | 5 +-
paimon-python/pypaimon/tests/blob_table_test.py | 236 +++++++++++++++++++++
paimon-python/pypaimon/write/blob_format_writer.py | 23 +-
paimon-python/pypaimon/write/file_store_write.py | 2 +
paimon-python/pypaimon/write/table_write.py | 9 +
.../pypaimon/write/writer/blob_file_writer.py | 17 +-
paimon-python/pypaimon/write/writer/blob_writer.py | 12 +-
.../write/writer/dedicated_format_writer.py | 6 +-
8 files changed, 295 insertions(+), 15 deletions(-)
diff --git a/paimon-python/pypaimon/table/row/blob.py
b/paimon-python/pypaimon/table/row/blob.py
index 43391775bd..056316d55f 100644
--- a/paimon-python/pypaimon/table/row/blob.py
+++ b/paimon-python/pypaimon/table/row/blob.py
@@ -18,7 +18,7 @@
import io
import struct
from abc import ABC, abstractmethod
-from typing import BinaryIO, Optional, Union
+from typing import BinaryIO, Callable, Optional, Union
from urllib.parse import urlparse
from pypaimon.common.uri_reader import UriReader, FileUriReader
@@ -382,3 +382,6 @@ class BlobRef(Blob):
def __hash__(self) -> int:
return hash(self._descriptor)
+
+
+BlobConsumer = Callable[[str, Optional[BlobDescriptor]], bool]
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index ca9161bd14..295182d43e 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -3568,5 +3568,241 @@ class
GetBlobNonBlobColumnSecurityTest(unittest.TestCase):
mock_create.assert_not_called()
+class BlobConsumerTest(unittest.TestCase):
+ """Tests for BlobConsumer callback functionality."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.temp_dir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.temp_dir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('test_db', False)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.temp_dir, ignore_errors=True)
+
+ def test_blob_consumer_basic(self):
+ """Consumer receives one BlobDescriptor per blob written, None for
nulls."""
+ from pypaimon.table.row.blob import Blob, BlobDescriptor
+ from pypaimon.common.uri_reader import FileUriReader
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_data', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('test_db.blob_consumer_basic', schema, False)
+ table = self.catalog.get_table('test_db.blob_consumer_basic')
+
+ blob_bytes = b'hello_blob_consumer'
+ received = []
+
+ def my_consumer(field_name, descriptor):
+ received.append((field_name, descriptor))
+ return True
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.with_blob_consumer(my_consumer)
+
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['a', 'b', 'c'],
+ 'blob_data': [blob_bytes, blob_bytes, None],
+ }, schema=pa_schema)
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ self.assertEqual(len(received), 3)
+
+ for field_name, desc in received[:2]:
+ self.assertEqual(field_name, 'blob_data')
+ self.assertIsInstance(desc, BlobDescriptor)
+ uri_reader = FileUriReader(table.file_io)
+ blob = Blob.from_descriptor(uri_reader, desc)
+ self.assertEqual(blob.to_data(), blob_bytes)
+
+ self.assertEqual(received[2][0], 'blob_data')
+ self.assertIsNone(received[2][1])
+
+ def test_blob_consumer_flush_behavior(self):
+ """Consumer return value controls flush; verify flush count."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_data', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('test_db.blob_consumer_flush', schema, False)
+ table = self.catalog.get_table('test_db.blob_consumer_flush')
+
+ blob_bytes = b'flush_test_blob'
+ descriptors = []
+ flush_count = [0]
+
+ def my_consumer(field_name, descriptor):
+ descriptors.append(descriptor)
+ should_flush = len(descriptors) % 2 == 0
+ if should_flush:
+ flush_count[0] += 1
+ return should_flush
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.with_blob_consumer(my_consumer)
+
+ test_data = pa.Table.from_pydict({
+ 'id': list(range(5)),
+ 'name': [f'row{i}' for i in range(5)],
+ 'blob_data': [blob_bytes] * 5,
+ }, schema=pa_schema)
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ self.assertEqual(len(descriptors), 5)
+ self.assertEqual(flush_count[0], 2)
+
+ from pypaimon.table.row.blob import Blob
+ from pypaimon.common.uri_reader import FileUriReader
+ uri_reader = FileUriReader(table.file_io)
+ for desc in descriptors:
+ self.assertIsNotNone(desc)
+ blob = Blob.from_descriptor(uri_reader, desc)
+ self.assertEqual(blob.to_data(), blob_bytes)
+
+ def test_blob_consumer_no_consumer_set(self):
+ """Without consumer, writing still works normally."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob_data', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('test_db.blob_no_consumer', schema, False)
+ table = self.catalog.get_table('test_db.blob_no_consumer')
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'blob_data': [b'data1', b'data2'],
+ }, schema=pa_schema)
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ result = table.new_read_builder().new_read().to_arrow(
+ table.new_read_builder().new_scan().plan().splits())
+ self.assertEqual(result.column('blob_data').to_pylist(), [b'data1',
b'data2'])
+
+ def test_blob_consumer_chain_call(self):
+ """with_blob_consumer returns self for chaining."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob_data', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('test_db.blob_consumer_chain', schema, False)
+ table = self.catalog.get_table('test_db.blob_consumer_chain')
+
+ write_builder = table.new_batch_write_builder()
+ result = write_builder.new_write().with_blob_consumer(lambda f, d:
False)
+ self.assertIsNotNone(result)
+ result.close()
+
+ def test_blob_consumer_abort_preserves_files(self):
+ """Abort with consumer must not delete blob files that descriptors
point to."""
+ from pypaimon.table.row.blob import Blob
+ from pypaimon.common.uri_reader import FileUriReader
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob_data', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob.target-file-size': '1KB',
+ })
+ self.catalog.create_table('test_db.blob_consumer_abort', schema, False)
+ table = self.catalog.get_table('test_db.blob_consumer_abort')
+
+ blob_bytes = b'X' * 2048
+ received = []
+
+ def my_consumer(field_name, descriptor):
+ received.append(descriptor)
+ return False
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.with_blob_consumer(my_consumer)
+
+ test_data = pa.Table.from_pydict({
+ 'id': list(range(5)),
+ 'blob_data': [blob_bytes] * 5,
+ }, schema=pa_schema)
+ writer.write_arrow(test_data)
+
+ self.assertGreater(len(received), 0)
+
+ # Capture data writers before close() clears them, then abort each one.
+ data_writers = list(writer.file_store_write.data_writers.values())
+ self.assertGreater(len(data_writers), 0)
+ for dw in data_writers:
+ dw.abort()
+
+ # Every descriptor returned to the consumer must still be readable.
+ uri_reader = FileUriReader(table.file_io)
+ for desc in received:
+ self.assertIsNotNone(desc)
+ data = Blob.from_descriptor(uri_reader, desc).to_data()
+ self.assertEqual(data, blob_bytes)
+
+ def test_blob_consumer_after_write_raises(self):
+ """Setting consumer after data has been written must raise."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob_data', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('test_db.blob_consumer_late', schema, False)
+ table = self.catalog.get_table('test_db.blob_consumer_late')
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+
+ writer.write_arrow(pa.Table.from_pydict({
+ 'id': [1],
+ 'blob_data': [b'data'],
+ }, schema=pa_schema))
+
+ with self.assertRaises(RuntimeError):
+ writer.with_blob_consumer(lambda f, d: False)
+ writer.close()
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/write/blob_format_writer.py
b/paimon-python/pypaimon/write/blob_format_writer.py
index 92257f4ca9..6004425b71 100644
--- a/paimon-python/pypaimon/write/blob_format_writer.py
+++ b/paimon-python/pypaimon/write/blob_format_writer.py
@@ -17,9 +17,9 @@
import struct
import zlib
-from typing import BinaryIO, List
+from typing import BinaryIO, List, Optional
-from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor
+from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor,
BlobConsumer
from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
@@ -31,8 +31,12 @@ class BlobFormatWriter:
BUFFER_SIZE = 4096
METADATA_SIZE = 12 # 8-byte length + 4-byte CRC
- def __init__(self, output_stream: BinaryIO):
+ def __init__(self, output_stream: BinaryIO,
+ blob_consumer: Optional[BlobConsumer] = None,
+ file_path: Optional[str] = None):
self.output_stream = output_stream
+ self._blob_consumer = blob_consumer
+ self._file_path = file_path
self.lengths: List[int] = []
self.position = 0
@@ -40,9 +44,12 @@ class BlobFormatWriter:
if not hasattr(row, 'values') or len(row.values) != 1:
raise ValueError("BlobFormatWriter only supports one field")
+ blob_field_name = row.fields[0].name
blob_value = row.values[0]
if blob_value is None:
self.lengths.append(self.NULL_LENGTH)
+ if self._blob_consumer is not None:
+ self._blob_consumer(blob_field_name, None)
return
if not isinstance(blob_value, Blob):
@@ -59,6 +66,8 @@ class BlobFormatWriter:
magic_bytes = struct.pack('<I', self.MAGIC_NUMBER) # Little endian
crc32 = self._write_with_crc(magic_bytes, crc32)
+ blob_pos = self.position
+
# Write blob data
if isinstance(blob_value, BlobData):
data = blob_value.to_data()
@@ -74,6 +83,8 @@ class BlobFormatWriter:
finally:
stream.close()
+ blob_length = self.position - blob_pos
+
# Calculate total length including magic + data + metadata (length +
CRC)
bin_length = self.position - previous_pos + self.METADATA_SIZE
self.lengths.append(bin_length)
@@ -88,6 +99,12 @@ class BlobFormatWriter:
self.output_stream.write(crc_bytes)
self.position += 4
+ if self._blob_consumer is not None:
+ descriptor = BlobDescriptor(self._file_path, blob_pos, blob_length)
+ flush = self._blob_consumer(blob_field_name, descriptor)
+ if flush:
+ self.output_stream.flush()
+
def _write_with_crc(self, data: bytes, crc32: int) -> int:
crc32 = zlib.crc32(data, crc32)
self.output_stream.write(data)
diff --git a/paimon-python/pypaimon/write/file_store_write.py
b/paimon-python/pypaimon/write/file_store_write.py
index 6a20708a14..f1374797e1 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -40,6 +40,7 @@ class FileStoreWrite:
self.data_writers: Dict[Tuple, DataWriter] = {}
self.max_seq_numbers: dict = {}
self.write_cols = None
+ self.blob_consumer = None
self.commit_identifier = 0
self.options = CoreOptions.copy(table.options)
if self.table.bucket_mode() == BucketMode.POSTPONE_MODE:
@@ -72,6 +73,7 @@ class FileStoreWrite:
max_seq_number=0,
options=options,
write_cols=self.write_cols,
+ blob_consumer=self.blob_consumer,
)
elif self._has_vector_columns() and options.with_vector_format():
return DataVectorWriter(
diff --git a/paimon-python/pypaimon/write/table_write.py
b/paimon-python/pypaimon/write/table_write.py
index 1eeeb5e846..411ddd9ceb 100644
--- a/paimon-python/pypaimon/write/table_write.py
+++ b/paimon-python/pypaimon/write/table_write.py
@@ -22,6 +22,7 @@ import pyarrow as pa
from pypaimon.schema.data_types import PyarrowFieldParser
from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
+from pypaimon.table.row.blob import BlobConsumer
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.file_store_write import FileStoreWrite
@@ -72,6 +73,14 @@ class TableWrite:
self.file_store_write.write_cols = write_cols
return self
+ def with_blob_consumer(self, blob_consumer: BlobConsumer):
+ if self.file_store_write.data_writers:
+ raise RuntimeError(
+ "with_blob_consumer must be called before any write operation."
+ )
+ self.file_store_write.blob_consumer = blob_consumer
+ return self
+
def write_ray(
self,
dataset: "Dataset",
diff --git a/paimon-python/pypaimon/write/writer/blob_file_writer.py
b/paimon-python/pypaimon/write/writer/blob_file_writer.py
index fe911586be..e4aa66a195 100644
--- a/paimon-python/pypaimon/write/writer/blob_file_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_file_writer.py
@@ -22,7 +22,7 @@ import pyarrow as pa
from pypaimon.write.blob_format_writer import BlobFormatWriter
from pypaimon.table.row.generic_row import GenericRow, RowKind
-from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor
+from pypaimon.table.row.blob import Blob, BlobConsumer, BlobData,
BlobDescriptor
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
@@ -32,11 +32,16 @@ class BlobFileWriter:
Writes rows one by one and tracks file size.
"""
- def __init__(self, file_io, file_path: Path):
+ def __init__(self, file_io, file_path: Path, blob_consumer:
Optional[BlobConsumer] = None):
self.file_io = file_io
self.file_path = file_path
+ self._blob_consumer = blob_consumer
self.output_stream = file_io.new_output_stream(file_path)
- self.writer = BlobFormatWriter(self.output_stream)
+ self.writer = BlobFormatWriter(
+ self.output_stream,
+ blob_consumer=blob_consumer,
+ file_path=str(file_path),
+ )
self.row_count = 0
self.closed = False
@@ -118,7 +123,7 @@ class BlobFileWriter:
return file_size
def abort(self):
- """Abort the writer and delete the file."""
+ """Abort the writer and delete the file (unless a blob consumer holds
references)."""
if not self.closed:
try:
if hasattr(self.output_stream, 'close'):
@@ -127,5 +132,5 @@ class BlobFileWriter:
pass
self.closed = True
- # Delete the file
- self.file_io.delete_quietly(self.file_path)
+ if self._blob_consumer is None:
+ self.file_io.delete_quietly(self.file_path)
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py
b/paimon-python/pypaimon/write/writer/blob_writer.py
index adc86e8486..24f64e4dff 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -22,6 +22,7 @@ from typing import Optional, Tuple, Dict
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.data.timestamp import Timestamp
+from pypaimon.table.row.blob import BlobConsumer
from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
from pypaimon.write.writer.blob_file_writer import BlobFileWriter
@@ -31,7 +32,7 @@ logger = logging.getLogger(__name__)
class BlobWriter(AppendOnlyDataWriter):
def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int, blob_column: str,
- options: Dict[str, str] = None):
+ options: Dict[str, str] = None, blob_consumer:
Optional[BlobConsumer] = None):
super().__init__(table, partition, bucket, max_seq_number,
options, write_cols=[blob_column])
@@ -44,6 +45,7 @@ class BlobWriter(AppendOnlyDataWriter):
options = self.table.options
self.blob_target_file_size = CoreOptions.blob_target_file_size(options)
+ self._blob_consumer = blob_consumer
self.current_writer: Optional[BlobFileWriter] = None
self.current_file_path: Optional[str] = None
self.record_count = 0
@@ -98,7 +100,7 @@ class BlobWriter(AppendOnlyDataWriter):
self.file_count += 1 # Increment counter for next file
file_path = self._generate_file_path(file_name)
self.current_file_path = file_path
- self.current_writer = BlobFileWriter(self.file_io, file_path)
+ self.current_writer = BlobFileWriter(self.file_io, file_path,
blob_consumer=self._blob_consumer)
def rolling_file(self) -> bool:
if self.current_writer is None:
@@ -236,7 +238,11 @@ class BlobWriter(AppendOnlyDataWriter):
logger.warning(f"Error aborting blob writer: {e}", exc_info=e)
self.current_writer = None
self.current_file_path = None
- super().abort()
+ if self._blob_consumer is not None:
+ self.pending_data = None
+ self.committed_files.clear()
+ else:
+ super().abort()
@staticmethod
def _get_column_stats(data_or_batch, column_name: str):
diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
index a682da864b..2fd0ec878e 100644
--- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
+++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
@@ -26,6 +26,7 @@ from pypaimon.data.timestamp import Timestamp
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.schema.data_types import VectorType
+from pypaimon.table.row.blob import BlobConsumer
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.write.writer.data_writer import DataWriter
@@ -50,7 +51,7 @@ class DedicatedFormatWriter(DataWriter):
CHECK_ROLLING_RECORD_CNT = 1000
def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int, options: CoreOptions = None,
- write_cols: Optional[List[str]] = None):
+ write_cols: Optional[List[str]] = None, blob_consumer:
Optional[BlobConsumer] = None):
super().__init__(table, partition, bucket, max_seq_number, options,
write_cols=write_cols)
# Determine blob columns from table schema
@@ -123,7 +124,8 @@ class DedicatedFormatWriter(DataWriter):
bucket=self.bucket,
max_seq_number=max_seq_number,
blob_column=blob_column,
- options=options
+ options=options,
+ blob_consumer=blob_consumer,
)
# Initialize vector writer when vector.file.format is configured.