This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 67e7bd8e96 [Python] Blob Table supports partition (#6488)
67e7bd8e96 is described below

commit 67e7bd8e9679b7272714a1ee2766daab2276c6af
Author: umi <[email protected]>
AuthorDate: Wed Oct 29 13:47:36 2025 +0800

    [Python] Blob Table supports partition (#6488)
---
 paimon-python/pypaimon/read/split_read.py       |   5 +-
 paimon-python/pypaimon/tests/blob_table_test.py | 158 ++++++++++++++++++++----
 2 files changed, 134 insertions(+), 29 deletions(-)

diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index a744fbc4f0..5c90c2ddb6 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -461,18 +461,19 @@ class DataEvolutionSplitRead(SplitRead):
             if not read_fields:
                 file_record_readers[i] = None
             else:
+                read_field_names = self._remove_partition_fields(read_fields)
                 table_fields = self.read_fields
                 self.read_fields = read_fields  # create reader based on 
read_fields
                 # Create reader for this bunch
                 if len(bunch.files()) == 1:
                     file_record_readers[i] = self._create_file_reader(
-                        bunch.files()[0], [field.name for field in read_fields]
+                        bunch.files()[0], read_field_names
                     )
                 else:
                     # Create concatenated reader for multiple files
                     suppliers = [
                         lambda f=file: self._create_file_reader(
-                            f, [field.name for field in read_fields]
+                            f, read_field_names
                         ) for file in bunch.files()
                     ]
                     file_record_readers[i] = MergeAllBatchReader(suppliers)
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index fe987ea0a6..67813ca5c3 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -310,7 +310,7 @@ class DataBlobWriterTest(unittest.TestCase):
         # Verify the blob size is approximately 50MB
         blob_size_mb = len(large_blob_data) / (1024 * 1024)
         self.assertGreater(blob_size_mb, 49)  # Should be at least 49MB
-        self.assertLess(blob_size_mb, 51)     # Should be less than 51MB
+        self.assertLess(blob_size_mb, 51)  # Should be less than 51MB
 
         total_rows = 0
 
@@ -646,9 +646,9 @@ class DataBlobWriterTest(unittest.TestCase):
             result_type = result_df.iloc[i]['type']
             result_data = result_df.iloc[i]['data']
 
-            self.assertEqual(result_id, original_id, f"Row {i+1}: ID should 
match")
-            self.assertEqual(result_type, original_type, f"Row {i+1}: Type 
should match")
-            self.assertEqual(result_data, original_data, f"Row {i+1}: Blob 
data should match")
+            self.assertEqual(result_id, original_id, f"Row {i + 1}: ID should 
match")
+            self.assertEqual(result_type, original_type, f"Row {i + 1}: Type 
should match")
+            self.assertEqual(result_data, original_data, f"Row {i + 1}: Blob 
data should match")
 
     def test_data_blob_writer_empty_batches(self):
         """Test DataBlobWriter with empty batches."""
@@ -742,7 +742,7 @@ class DataBlobWriterTest(unittest.TestCase):
 
         # Create data that should trigger rolling
         large_content = 'X' * 1000  # Large string content
-        large_blob = b'B' * 5000    # Large blob data
+        large_blob = b'B' * 5000  # Large blob data
 
         # Write multiple batches to test rolling
         for i in range(10):  # 10 batches
@@ -806,7 +806,8 @@ class DataBlobWriterTest(unittest.TestCase):
                 b'medium_blob_data_2_with_more_content',
                 b'large_blob_data_3_with_even_more_content_and_details',
                 
b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here',  # 
noqa: E501
-                
b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects'
  # noqa: E501
+                b'extremely_large_blob_data_5_with_comprehensive_content_and_'
+                b'extensive_details_covering_multiple_aspects'  # noqa: E501
             ]
         }, schema=pa_schema)
 
@@ -849,8 +850,10 @@ class DataBlobWriterTest(unittest.TestCase):
 
         # Verify normal columns
         self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID 
column should match")
-        self.assertEqual(result.column('name').to_pylist(), ['Alice', 'Bob', 
'Charlie', 'David', 'Eve'], "Name column should match")  # noqa: E501
-        self.assertEqual(result.column('description').to_pylist(), ['User 1', 
'User 2', 'User 3', 'User 4', 'User 5'], "Description column should match")  # 
noqa: E501
+        self.assertEqual(result.column('name').to_pylist(), ['Alice', 'Bob', 
'Charlie', 'David', 'Eve'],
+                         "Name column should match")  # noqa: E501
+        self.assertEqual(result.column('description').to_pylist(), ['User 1', 
'User 2', 'User 3', 'User 4', 'User 5'],
+                         "Description column should match")  # noqa: E501
 
         # Verify blob data correctness
         blob_data = result.column('blob_data').to_pylist()
@@ -859,7 +862,8 @@ class DataBlobWriterTest(unittest.TestCase):
             b'medium_blob_data_2_with_more_content',
             b'large_blob_data_3_with_even_more_content_and_details',
             
b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here',  # 
noqa: E501
-            
b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects'
  # noqa: E501
+            
b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects'
+            # noqa: E501
         ]
 
         self.assertEqual(len(blob_data), 5, "Should have 5 blob records")
@@ -867,10 +871,103 @@ class DataBlobWriterTest(unittest.TestCase):
 
         # Verify individual blob sizes
         for i, (actual_blob, expected_blob) in enumerate(zip(blob_data, 
expected_blobs)):
-            self.assertEqual(len(actual_blob), len(expected_blob), f"Blob 
{i+1} size should match")
-            self.assertEqual(actual_blob, expected_blob, f"Blob {i+1} content 
should match exactly")
+            self.assertEqual(len(actual_blob), len(expected_blob), f"Blob {i + 
1} size should match")
+            self.assertEqual(actual_blob, expected_blob, f"Blob {i + 1} 
content should match exactly")
 
-        print(f"✅ End-to-end blob write/read test passed: wrote and read back 
{len(blob_data)} blob records correctly")  # noqa: E501
+        print(
+            f"✅ End-to-end blob write/read test passed: wrote and read back 
{len(blob_data)} blob records correctly")  # noqa: E501
+
+    def test_blob_write_read_partition(self):
+        """Test complete end-to-end blob functionality: write blob data and 
read it back to verify correctness."""
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('description', pa.string()),
+            ('blob_data', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema, partition_keys=['name'],
+            # pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.blob_write_read_partition', schema, 
False)
+        table = self.catalog.get_table('test_db.blob_write_read_partition')
+
+        # Test data with various blob sizes and types
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3, 4, 5],
+            'name': ['Alice', 'Alice', 'David', 'David', 'David'],
+            'description': ['User 1', 'User 2', 'User 3', 'User 4', 'User 5'],
+            'blob_data': [
+                b'small_blob_1',
+                b'medium_blob_data_2_with_more_content',
+                b'large_blob_data_3_with_even_more_content_and_details',
+                
b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here',  # 
noqa: E501
+                b'extremely_large_blob_data_5_with_comprehensive_content_and_'
+                b'extensive_details_covering_multiple_aspects'
+                # noqa: E501
+            ]
+        }, schema=pa_schema)
+
+        # Write data using table API
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+
+        # Commit the data
+        commit_messages = writer.prepare_commit()
+
+        # Create commit and commit the data
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        # Read data back using table API
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        splits = table_scan.plan().splits()
+        result = table_read.to_arrow(splits)
+
+        # Verify the data was read back correctly
+        self.assertEqual(result.num_rows, 5, "Should have 5 rows")
+        self.assertEqual(result.num_columns, 4, "Should have 4 columns")
+
+        # Verify normal columns
+        self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID 
column should match")
+        self.assertEqual(result.column('name').to_pylist(), ['Alice', 'Alice', 
'David', 'David', 'David'],
+                         "Name column should match")  # noqa: E501
+        self.assertEqual(result.column('description').to_pylist(), ['User 1', 
'User 2', 'User 3', 'User 4', 'User 5'],
+                         "Description column should match")  # noqa: E501
+
+        # Verify blob data correctness
+        blob_data = result.column('blob_data').to_pylist()
+        expected_blobs = [
+            b'small_blob_1',
+            b'medium_blob_data_2_with_more_content',
+            b'large_blob_data_3_with_even_more_content_and_details',
+            
b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here',  # 
noqa: E501
+            
b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects'
+            # noqa: E501
+        ]
+
+        self.assertEqual(len(blob_data), 5, "Should have 5 blob records")
+        self.assertEqual(blob_data, expected_blobs, "Blob data should match 
exactly")
+
+        # Verify individual blob sizes
+        for i, (actual_blob, expected_blob) in enumerate(zip(blob_data, 
expected_blobs)):
+            self.assertEqual(len(actual_blob), len(expected_blob), f"Blob {i + 
1} size should match")
+            self.assertEqual(actual_blob, expected_blob, f"Blob {i + 1} 
content should match exactly")
+
+        print(
+            f"✅ End-to-end blob write/read test passed: wrote and read back 
{len(blob_data)} blob records correctly")  # noqa: E501
 
     def test_blob_write_read_end_to_end_with_descriptor(self):
         """Test end-to-end blob functionality using blob descriptors."""
@@ -1044,18 +1141,20 @@ class DataBlobWriterTest(unittest.TestCase):
 
         # Verify normal columns
         self.assertEqual(result.column('id').to_pylist(), [1, 2, 3], "ID 
column should match")
-        self.assertEqual(result.column('metadata').to_pylist(), ['Large blob 
1', 'Large blob 2', 'Large blob 3'], "Metadata column should match")  # noqa: 
E501
+        self.assertEqual(result.column('metadata').to_pylist(), ['Large blob 
1', 'Large blob 2', 'Large blob 3'],
+                         "Metadata column should match")  # noqa: E501
 
         # Verify blob data integrity
         blob_data = result.column('large_blob').to_pylist()
         self.assertEqual(len(blob_data), 3, "Should have 3 blob records")
 
         for i, blob in enumerate(blob_data):
-            self.assertEqual(len(blob), len(large_blob_data), f"Blob {i+1} 
should be {large_blob_size} bytes")
-            self.assertEqual(blob, large_blob_data, f"Blob {i+1} content 
should match exactly")
-            print(f"✅ Verified large blob {i+1}: {len(blob)} bytes")
+            self.assertEqual(len(blob), len(large_blob_data), f"Blob {i + 1} 
should be {large_blob_size} bytes")
+            self.assertEqual(blob, large_blob_data, f"Blob {i + 1} content 
should match exactly")
+            print(f"✅ Verified large blob {i + 1}: {len(blob)} bytes")
 
-        print(f"✅ Large blob end-to-end test passed: wrote and read back 
{len(blob_data)} large blob records correctly")  # noqa: E501
+        print(
+            f"✅ Large blob end-to-end test passed: wrote and read back 
{len(blob_data)} large blob records correctly")  # noqa: E501
 
     def test_blob_write_read_mixed_sizes_end_to_end(self):
         """Test end-to-end blob functionality with mixed blob sizes."""
@@ -1129,7 +1228,8 @@ class DataBlobWriterTest(unittest.TestCase):
 
         # Verify normal columns
         self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID 
column should match")
-        self.assertEqual(result.column('size_category').to_pylist(), ['tiny', 
'small', 'medium', 'large', 'huge'], "Size category column should match")  # 
noqa: E501
+        self.assertEqual(result.column('size_category').to_pylist(), ['tiny', 
'small', 'medium', 'large', 'huge'],
+                         "Size category column should match")  # noqa: E501
 
         # Verify blob data
         blob_data = result.column('blob_data').to_pylist()
@@ -1145,9 +1245,10 @@ class DataBlobWriterTest(unittest.TestCase):
 
         # Verify individual blob content
         for i, (actual_blob, expected_blob) in enumerate(zip(blob_data, 
expected_blobs)):
-            self.assertEqual(actual_blob, expected_blob, f"Blob {i+1} content 
should match exactly")
+            self.assertEqual(actual_blob, expected_blob, f"Blob {i + 1} 
content should match exactly")
 
-        print(f"✅ Mixed sizes end-to-end test passed: wrote and read back 
blobs ranging from {min(sizes)} to {max(sizes)} bytes")  # noqa: E501
+        print(
+            f"✅ Mixed sizes end-to-end test passed: wrote and read back blobs 
ranging from {min(sizes)} to {max(sizes)} bytes")  # noqa: E501
 
     def test_blob_write_read_large_data_end_to_end_with_rolling(self):
         """Test end-to-end blob functionality with large blob data (50MB per 
blob) and rolling behavior (40 blobs)."""
@@ -1180,7 +1281,7 @@ class DataBlobWriterTest(unittest.TestCase):
 
         # Verify the blob size is exactly 50MB
         actual_size = len(large_blob_data)
-        print(f"Created blob data: {actual_size:,} bytes ({actual_size / 
(1024*1024):.2f} MB)")
+        print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024 
* 1024):.2f} MB)")
 
         # Write 40 batches of data (each with 1 blob of 50MB)
         write_builder = table.new_batch_write_builder()
@@ -1245,8 +1346,10 @@ class DataBlobWriterTest(unittest.TestCase):
         expected_metadata = [f'Large blob batch {i}' for i in range(1, 41)]
 
         self.assertEqual(result.column('id').to_pylist(), expected_ids, "ID 
column should match")
-        self.assertEqual(result.column('batch_id').to_pylist(), 
expected_batch_ids, "Batch ID column should match")  # noqa: E501
-        self.assertEqual(result.column('metadata').to_pylist(), 
expected_metadata, "Metadata column should match")  # noqa: E501
+        self.assertEqual(result.column('batch_id').to_pylist(), 
expected_batch_ids,
+                         "Batch ID column should match")  # noqa: E501
+        self.assertEqual(result.column('metadata').to_pylist(), 
expected_metadata,
+                         "Metadata column should match")  # noqa: E501
 
         # Verify blob data integrity
         blob_data = result.column('large_blob').to_pylist()
@@ -1254,12 +1357,12 @@ class DataBlobWriterTest(unittest.TestCase):
 
         # Verify each blob
         for i, blob in enumerate(blob_data):
-            self.assertEqual(len(blob), len(large_blob_data), f"Blob {i+1} 
should be {large_blob_size:,} bytes")
-            self.assertEqual(blob, large_blob_data, f"Blob {i+1} content 
should match exactly")
+            self.assertEqual(len(blob), len(large_blob_data), f"Blob {i + 1} 
should be {large_blob_size:,} bytes")
+            self.assertEqual(blob, large_blob_data, f"Blob {i + 1} content 
should match exactly")
 
             # Print progress every 10 blobs
             if (i + 1) % 10 == 0:
-                print(f"✅ Verified blob {i+1}/40: {len(blob):,} bytes")
+                print(f"✅ Verified blob {i + 1}/40: {len(blob):,} bytes")
 
         # Verify total data size
         total_blob_size = sum(len(blob) for blob in blob_data)
@@ -1269,7 +1372,8 @@ class DataBlobWriterTest(unittest.TestCase):
 
         print("✅ Large blob rolling end-to-end test passed:")
         print("   - Wrote and read back 40 blobs of 50MB each")
-        print(f"   - Total data size: {total_blob_size:,} bytes 
({total_blob_size / (1024*1024*1024):.2f} GB)")  # noqa: E501
+        print(
+            f"   - Total data size: {total_blob_size:,} bytes 
({total_blob_size / (1024 * 1024 * 1024):.2f} GB)")  # noqa: E501
         print("   - All blob content verified as correct")
 
     def test_data_blob_writer_with_shard(self):

Reply via email to