This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 86ad99a6ac [python] optimize schema validation and support 
binary/large_binary type conversion (#7088)
86ad99a6ac is described below

commit 86ad99a6ac6341e0a09a061136b1fc12076cd5c3
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed May 13 22:18:59 2026 +0800

    [python] optimize schema validation and support binary/large_binary type 
conversion (#7088)
---
 paimon-python/pypaimon/tests/ray_data_test.py | 139 ++++++++++++++++++++++++++
 paimon-python/pypaimon/write/ray_datasink.py  |  29 ++++++
 2 files changed, 168 insertions(+)

diff --git a/paimon-python/pypaimon/tests/ray_data_test.py 
b/paimon-python/pypaimon/tests/ray_data_test.py
index eb1198b5f7..990f23a5ca 100644
--- a/paimon-python/pypaimon/tests/ray_data_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_test.py
@@ -21,10 +21,12 @@ import unittest
 import shutil
 
 import pyarrow as pa
+import pyarrow.types as pa_types
 import ray
 
 from pypaimon import CatalogFactory, Schema
 from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.schema.data_types import PyarrowFieldParser
 
 
 class RayDataTest(unittest.TestCase):
@@ -696,5 +698,142 @@ class RayDataTest(unittest.TestCase):
             table_read.to_ray(splits, override_num_blocks=-10)
         self.assertIn("override_num_blocks must be at least 1", 
str(context.exception))
 
+    def test_dict_return_loses_large_binary_type(self):
+        # Original data with large_binary
+        original = pa.table({
+            'data': pa.array([b'hello', b'world'], type=pa.large_binary())
+        })
+        self.assertTrue(
+            pa_types.is_large_binary(original.schema.field('data').type),
+            "Original should be large_binary"
+        )
+
+        # Simulate map_batches returning dict: convert to Python list then 
rebuild
+        d = {'data': original['data'].to_pylist()}
+        rebuilt = pa.Table.from_pydict(d)
+        self.assertTrue(
+            pa_types.is_binary(rebuilt.schema.field('data').type),
+            f"Rebuilt from dict should be binary (PyArrow default), but got 
{rebuilt.schema.field('data').type}"
+        )
+        self.assertFalse(
+            pa_types.is_large_binary(rebuilt.schema.field('data').type),
+            "large_binary type should be lost after dict roundtrip"
+        )
+
+    def test_ray_data_read_and_write_with_blob(self):
+        import time
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('name', pa.string()),
+            ('data', pa.large_binary()),  # Table uses large_binary for blob
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob-field': 'data',
+            }
+        )
+
+        table_name = f'default.test_ray_read_write_blob_{int(time.time() * 
1000000)}'
+        self.catalog.create_table(table_name, schema, False)
+        table = self.catalog.get_table(table_name)
+
+        # Step 1: Write data to Paimon table using write_arrow (large_binary 
type)
+        initial_data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['Alice', 'Bob', 'Charlie'],
+            'data': [b'blob_data_1', b'blob_data_2', b'blob_data_3'],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(initial_data)
+        commit_messages = writer.prepare_commit()
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        # Step 2: Read from Paimon table using to_ray()
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+        table_scan = read_builder.new_scan()
+        splits = table_scan.plan().splits()
+
+        ray_dataset = table_read.to_ray(splits)
+
+        # Verify Ray blocks preserve large_binary type from Paimon
+        for batch in ray_dataset.iter_batches(batch_size=10, 
batch_format="pyarrow"):
+            ray_data_field = batch.schema.field('data')
+            self.assertTrue(
+                pa_types.is_large_binary(ray_data_field.type),
+                f"Ray block should preserve large_binary() from Paimon, but 
got {ray_data_field.type}"
+            )
+            break
+
+        # Verify Paimon table schema is large_binary (BLOB)
+        table_pa_schema = 
PyarrowFieldParser.from_paimon_schema(table.table_schema.fields)
+        self.assertTrue(
+            pa_types.is_large_binary(table_pa_schema.field('data').type),
+            "Paimon table should have large_binary() for BLOB field"
+        )
+
+        # Step 3: Simulate user pipeline: map_batches returns Python dict,
+        def process_blob(batch):
+            return {
+                'id': batch['id'].to_pylist(),
+                'name': batch['name'].to_pylist(),
+                'data': batch['data'].to_pylist(),  # Python bytes -> binary
+            }
+
+        mapped_dataset = ray_dataset.map_batches(process_blob, 
batch_format="pyarrow")
+
+        # Verify map_batches caused type downgrade: large_binary -> binary
+        for batch in mapped_dataset.iter_batches(batch_size=10, 
batch_format="pyarrow"):
+            mapped_data_field = batch.schema.field('data')
+            self.assertTrue(
+                pa_types.is_binary(mapped_data_field.type),
+                f"After map_batches returning dict, data should be binary(), 
but got {mapped_data_field.type}"
+            )
+            break
+
+        # Step 4: Write mapped dataset back via write_ray().
+        write_builder2 = table.new_batch_write_builder()
+        writer2 = write_builder2.new_write()
+
+        writer2.write_ray(
+            mapped_dataset,
+            overwrite=False,
+            concurrency=1
+        )
+        writer2.close()
+
+        # Step 5: Verify the data was written correctly
+        read_builder2 = table.new_read_builder()
+        table_read2 = read_builder2.new_read()
+        result = table_read2.to_arrow(read_builder2.new_scan().plan().splits())
+
+        self.assertEqual(result.num_rows, 6, "Table should have 6 rows after 
roundtrip")
+
+        result_df = result.to_pandas()
+        result_df_sorted = 
result_df.sort_values(by='id').reset_index(drop=True)
+
+        self.assertEqual(list(result_df_sorted['id']), [1, 1, 2, 2, 3, 3], "ID 
column should match")
+        self.assertEqual(
+            list(result_df_sorted['name']),
+            ['Alice', 'Alice', 'Bob', 'Bob', 'Charlie', 'Charlie'],
+            "Name column should match"
+        )
+
+        written_data_values = [bytes(d) if d is not None else None for d in 
result_df_sorted['data']]
+        self.assertEqual(
+            written_data_values,
+            [b'blob_data_1', b'blob_data_1', b'blob_data_2', b'blob_data_2', 
b'blob_data_3', b'blob_data_3'],
+            "Blob data column should match"
+        )
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/write/ray_datasink.py 
b/paimon-python/pypaimon/write/ray_datasink.py
index 196fb62cb1..a01387b327 100644
--- a/paimon-python/pypaimon/write/ray_datasink.py
+++ b/paimon-python/pypaimon/write/ray_datasink.py
@@ -36,6 +36,29 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
+
+def _cast_binary_to_table_schema(table: pa.Table, target_schema: pa.Schema) -> 
pa.Table:
+    """Cast binary to large_binary for BLOB fields.
+
+    When map_batches returns Python dicts, PyArrow infers bytes as binary,
+    losing the original large_binary (BLOB) type. Cast back before writing.
+    """
+    cast_indices = []
+    for i, field in enumerate(table.schema):
+        target_field = target_schema.field(field.name) if field.name in 
target_schema.names else None
+        if target_field and pa.types.is_binary(field.type) and 
pa.types.is_large_binary(target_field.type):
+            cast_indices.append(i)
+
+    if not cast_indices:
+        return table
+
+    columns = table.columns
+    for i in cast_indices:
+        columns[i] = columns[i].cast(pa.large_binary())
+    fields = [target_schema.field(f.name) if i in cast_indices else f
+              for i, f in enumerate(table.schema)]
+    return pa.table(columns, schema=pa.schema(fields))
+
 # Python 3.8 / Ray 2.10: Datasink is not subscriptable at runtime
 try:
     _DatasinkBase = Datasink[List["CommitMessage"]]
@@ -90,12 +113,18 @@ class PaimonDatasink(_DatasinkBase):
             
             table_write = writer_builder.new_write()
 
+            table_schema = self.table.table_schema
+            from pypaimon.schema.data_types import PyarrowFieldParser
+            target_pa_schema = 
PyarrowFieldParser.from_paimon_schema(table_schema.fields)
+
             for block in blocks:
                 block_arrow: pa.Table = 
BlockAccessor.for_block(block).to_arrow()
 
                 if block_arrow.num_rows == 0:
                     continue
 
+                block_arrow = _cast_binary_to_table_schema(block_arrow, 
target_pa_schema)
+
                 table_write.write_arrow(block_arrow)
 
             commit_messages = table_write.prepare_commit()

Reply via email to