This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 82fe60a684bd592dc1d301338fe5ef3deb42cd13 Author: YeJunHao <[email protected]> AuthorDate: Fri Oct 17 15:09:12 2025 +0800 [python] Blob type more test for descriptor (#6422) --- paimon-python/pypaimon/common/core_options.py | 4 + paimon-python/pypaimon/read/split_read.py | 2 +- paimon-python/pypaimon/tests/blob_table_test.py | 100 +++++++++++++++++++++ paimon-python/pypaimon/write/writer/data_writer.py | 2 +- 4 files changed, 106 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/common/core_options.py b/paimon-python/pypaimon/common/core_options.py index da1ad0674e..cbf35b33e4 100644 --- a/paimon-python/pypaimon/common/core_options.py +++ b/paimon-python/pypaimon/common/core_options.py @@ -51,3 +51,7 @@ class CoreOptions(str, Enum): COMMIT_USER_PREFIX = "commit.user-prefix" ROW_TRACKING_ENABLED = "row-tracking.enabled" DATA_EVOLUTION_ENABLED = "data-evolution.enabled" + + @staticmethod + def get_blob_as_descriptor(options: dict) -> bool: + return options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, "false").lower() == 'true' diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 64a28ac63c..5a8bc4825b 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -82,7 +82,7 @@ class SplitRead(ABC): format_reader = FormatAvroReader(self.table.file_io, file_path, read_fields, self.read_fields, self.push_down_predicate) elif file_format == CoreOptions.FILE_FORMAT_BLOB: - blob_as_descriptor = self.table.options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False) + blob_as_descriptor = CoreOptions.get_blob_as_descriptor(self.table.options) format_reader = FormatBlobReader(self.table.file_io, file_path, read_fields, self.read_fields, self.push_down_predicate, blob_as_descriptor) elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC: diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index 3c8273bde4..d24b3f0d5a 100644 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -23,6 +23,7 @@ import unittest import pyarrow as pa from pypaimon import CatalogFactory +from pypaimon.table.file_store_table import FileStoreTable from pypaimon.write.commit_message import CommitMessage @@ -871,6 +872,105 @@ class DataBlobWriterTest(unittest.TestCase): print(f"✅ End-to-end blob write/read test passed: wrote and read back {len(blob_data)} blob records correctly") # noqa: E501 + def test_blob_write_read_end_to_end_with_descriptor(self): + """Test end-to-end blob functionality using blob descriptors.""" + import random + from pypaimon import Schema + from pypaimon.table.row.blob import BlobDescriptor, Blob + from pypaimon.common.uri_reader import UriReaderFactory + from pypaimon.common.config import CatalogOptions + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('picture', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'blob-as-descriptor': 'true' + } + ) + + # Create table + self.catalog.create_table('test_db.blob_descriptor_test', schema, False) + table: FileStoreTable = self.catalog.get_table('test_db.blob_descriptor_test') + + # Create test blob data (1MB) + blob_data = bytearray(1024 * 1024) + random.seed(42) # For reproducible tests + for i in range(len(blob_data)): + blob_data[i] = random.randint(0, 255) + blob_data = bytes(blob_data) + + # Create external blob file + external_blob_path = os.path.join(self.temp_dir, 'external_blob') + with open(external_blob_path, 'wb') as f: + f.write(blob_data) + + # Create blob descriptor pointing to external file + blob_descriptor = BlobDescriptor(external_blob_path, 0, len(blob_data)) + + # Create test data with blob descriptor + test_data = pa.Table.from_pydict({ + 'id': [1], + 'name': ['paimon'], + 'picture': [blob_descriptor.serialize()] + }, schema=pa_schema) + + # Write data using table API + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(test_data) + + # Commit the data + commit_messages = writer.prepare_commit() + commit = write_builder.new_commit() + commit.commit(commit_messages) + + # Read data back + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + result = table_read.to_arrow(table_scan.plan().splits()) + + # Verify the data was written and read correctly + self.assertEqual(result.num_rows, 1, "Should have 1 row") + self.assertEqual(result.column('id').to_pylist(), [1], "ID should match") + self.assertEqual(result.column('name').to_pylist(), ['paimon'], "Name should match") + + # Get the blob descriptor bytes from the result + picture_bytes = result.column('picture').to_pylist()[0] + self.assertIsInstance(picture_bytes, bytes, "Picture should be bytes") + + # Deserialize the blob descriptor + new_blob_descriptor = BlobDescriptor.deserialize(picture_bytes) + + # The URI might be different if the blob was stored in the table's data directory + # Let's verify the descriptor properties and try to read the data + # Note: offset might be non-zero due to blob file format overhead + self.assertGreaterEqual(new_blob_descriptor.offset, 0, "Offset should be non-negative") + self.assertEqual(new_blob_descriptor.length, len(blob_data), "Length should match") + + # Create URI reader factory and read the blob data + catalog_options = {CatalogOptions.WAREHOUSE: self.warehouse} + uri_reader_factory = UriReaderFactory(catalog_options) + uri_reader = uri_reader_factory.create(new_blob_descriptor.uri) + blob = Blob.from_descriptor(uri_reader, new_blob_descriptor) + + # Verify the blob data matches the original + self.assertEqual(blob.to_data(), blob_data, "Blob data should match original") + + print("✅ Blob descriptor end-to-end test passed:") + print(" - Created external blob file and descriptor") + print(" - Wrote and read blob descriptor successfully") + print(" - Verified blob data can be read from descriptor") + print(" - Tested blob-as-descriptor=true mode") + def test_blob_write_read_large_data_end_to_end(self): """Test end-to-end blob functionality with large blob data (1MB per blob).""" from pypaimon import Schema diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 502d196ae6..6ee92082db 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -58,7 +58,7 @@ class DataWriter(ABC): self.pending_data: Optional[pa.Table] = None self.committed_files: List[DataFileMeta] = [] self.write_cols = write_cols - self.blob_as_descriptor = options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False) + self.blob_as_descriptor = CoreOptions.get_blob_as_descriptor(options) def write(self, data: pa.RecordBatch): try:
