This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 82fe60a684bd592dc1d301338fe5ef3deb42cd13
Author: YeJunHao <[email protected]>
AuthorDate: Fri Oct 17 15:09:12 2025 +0800

    [python] Blob type more test for descriptor (#6422)
---
 paimon-python/pypaimon/common/core_options.py      |   4 +
 paimon-python/pypaimon/read/split_read.py          |   2 +-
 paimon-python/pypaimon/tests/blob_table_test.py    | 100 +++++++++++++++++++++
 paimon-python/pypaimon/write/writer/data_writer.py |   2 +-
 4 files changed, 106 insertions(+), 2 deletions(-)

diff --git a/paimon-python/pypaimon/common/core_options.py 
b/paimon-python/pypaimon/common/core_options.py
index da1ad0674e..cbf35b33e4 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -51,3 +51,7 @@ class CoreOptions(str, Enum):
     COMMIT_USER_PREFIX = "commit.user-prefix"
     ROW_TRACKING_ENABLED = "row-tracking.enabled"
     DATA_EVOLUTION_ENABLED = "data-evolution.enabled"
+
+    @staticmethod
+    def get_blob_as_descriptor(options: dict) -> bool:
+        return options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, 
"false").lower() == 'true'
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 64a28ac63c..5a8bc4825b 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -82,7 +82,7 @@ class SplitRead(ABC):
             format_reader = FormatAvroReader(self.table.file_io, file_path, 
read_fields,
                                              self.read_fields, 
self.push_down_predicate)
         elif file_format == CoreOptions.FILE_FORMAT_BLOB:
-            blob_as_descriptor = 
self.table.options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
+            blob_as_descriptor = 
CoreOptions.get_blob_as_descriptor(self.table.options)
             format_reader = FormatBlobReader(self.table.file_io, file_path, 
read_fields,
                                              self.read_fields, 
self.push_down_predicate, blob_as_descriptor)
         elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == 
CoreOptions.FILE_FORMAT_ORC:
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 3c8273bde4..d24b3f0d5a 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -23,6 +23,7 @@ import unittest
 import pyarrow as pa
 
 from pypaimon import CatalogFactory
+from pypaimon.table.file_store_table import FileStoreTable
 from pypaimon.write.commit_message import CommitMessage
 
 
@@ -871,6 +872,105 @@ class DataBlobWriterTest(unittest.TestCase):
 
         print(f"✅ End-to-end blob write/read test passed: wrote and read back 
{len(blob_data)} blob records correctly")  # noqa: E501
 
+    def test_blob_write_read_end_to_end_with_descriptor(self):
+        """Test end-to-end blob functionality using blob descriptors."""
+        import random
+        from pypaimon import Schema
+        from pypaimon.table.row.blob import BlobDescriptor, Blob
+        from pypaimon.common.uri_reader import UriReaderFactory
+        from pypaimon.common.config import CatalogOptions
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('picture', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob-as-descriptor': 'true'
+            }
+        )
+
+        # Create table
+        self.catalog.create_table('test_db.blob_descriptor_test', schema, 
False)
+        table: FileStoreTable = 
self.catalog.get_table('test_db.blob_descriptor_test')
+
+        # Create test blob data (1MB)
+        blob_data = bytearray(1024 * 1024)
+        random.seed(42)  # For reproducible tests
+        for i in range(len(blob_data)):
+            blob_data[i] = random.randint(0, 255)
+        blob_data = bytes(blob_data)
+
+        # Create external blob file
+        external_blob_path = os.path.join(self.temp_dir, 'external_blob')
+        with open(external_blob_path, 'wb') as f:
+            f.write(blob_data)
+
+        # Create blob descriptor pointing to external file
+        blob_descriptor = BlobDescriptor(external_blob_path, 0, len(blob_data))
+
+        # Create test data with blob descriptor
+        test_data = pa.Table.from_pydict({
+            'id': [1],
+            'name': ['paimon'],
+            'picture': [blob_descriptor.serialize()]
+        }, schema=pa_schema)
+
+        # Write data using table API
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+
+        # Commit the data
+        commit_messages = writer.prepare_commit()
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+
+        # Read data back
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+
+        # Verify the data was written and read correctly
+        self.assertEqual(result.num_rows, 1, "Should have 1 row")
+        self.assertEqual(result.column('id').to_pylist(), [1], "ID should 
match")
+        self.assertEqual(result.column('name').to_pylist(), ['paimon'], "Name 
should match")
+
+        # Get the blob descriptor bytes from the result
+        picture_bytes = result.column('picture').to_pylist()[0]
+        self.assertIsInstance(picture_bytes, bytes, "Picture should be bytes")
+
+        # Deserialize the blob descriptor
+        new_blob_descriptor = BlobDescriptor.deserialize(picture_bytes)
+
+        # The URI might be different if the blob was stored in the table's 
data directory
+        # Let's verify the descriptor properties and try to read the data
+        # Note: offset might be non-zero due to blob file format overhead
+        self.assertGreaterEqual(new_blob_descriptor.offset, 0, "Offset should 
be non-negative")
+        self.assertEqual(new_blob_descriptor.length, len(blob_data), "Length 
should match")
+
+        # Create URI reader factory and read the blob data
+        catalog_options = {CatalogOptions.WAREHOUSE: self.warehouse}
+        uri_reader_factory = UriReaderFactory(catalog_options)
+        uri_reader = uri_reader_factory.create(new_blob_descriptor.uri)
+        blob = Blob.from_descriptor(uri_reader, new_blob_descriptor)
+
+        # Verify the blob data matches the original
+        self.assertEqual(blob.to_data(), blob_data, "Blob data should match 
original")
+
+        print("✅ Blob descriptor end-to-end test passed:")
+        print("   - Created external blob file and descriptor")
+        print("   - Wrote and read blob descriptor successfully")
+        print("   - Verified blob data can be read from descriptor")
+        print("   - Tested blob-as-descriptor=true mode")
+
     def test_blob_write_read_large_data_end_to_end(self):
         """Test end-to-end blob functionality with large blob data (1MB per 
blob)."""
         from pypaimon import Schema
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 502d196ae6..6ee92082db 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -58,7 +58,7 @@ class DataWriter(ABC):
         self.pending_data: Optional[pa.Table] = None
         self.committed_files: List[DataFileMeta] = []
         self.write_cols = write_cols
-        self.blob_as_descriptor = 
options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
+        self.blob_as_descriptor = CoreOptions.get_blob_as_descriptor(options)
 
     def write(self, data: pa.RecordBatch):
         try:

Reply via email to