This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 86ad99a6ac [python] optimize schema validation and support
binary/large_binary type conversion (#7088)
86ad99a6ac is described below
commit 86ad99a6ac6341e0a09a061136b1fc12076cd5c3
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed May 13 22:18:59 2026 +0800
[python] optimize schema validation and support binary/large_binary type
conversion (#7088)
---
paimon-python/pypaimon/tests/ray_data_test.py | 139 ++++++++++++++++++++++++++
paimon-python/pypaimon/write/ray_datasink.py | 29 ++++++
2 files changed, 168 insertions(+)
diff --git a/paimon-python/pypaimon/tests/ray_data_test.py
b/paimon-python/pypaimon/tests/ray_data_test.py
index eb1198b5f7..990f23a5ca 100644
--- a/paimon-python/pypaimon/tests/ray_data_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_test.py
@@ -21,10 +21,12 @@ import unittest
import shutil
import pyarrow as pa
+import pyarrow.types as pa_types
import ray
from pypaimon import CatalogFactory, Schema
from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.schema.data_types import PyarrowFieldParser
class RayDataTest(unittest.TestCase):
@@ -696,5 +698,142 @@ class RayDataTest(unittest.TestCase):
table_read.to_ray(splits, override_num_blocks=-10)
self.assertIn("override_num_blocks must be at least 1",
str(context.exception))
+ def test_dict_return_loses_large_binary_type(self):
+ # Original data with large_binary
+ original = pa.table({
+ 'data': pa.array([b'hello', b'world'], type=pa.large_binary())
+ })
+ self.assertTrue(
+ pa_types.is_large_binary(original.schema.field('data').type),
+ "Original should be large_binary"
+ )
+
+ # Simulate map_batches returning dict: convert to Python list then
rebuild
+ d = {'data': original['data'].to_pylist()}
+ rebuilt = pa.Table.from_pydict(d)
+ self.assertTrue(
+ pa_types.is_binary(rebuilt.schema.field('data').type),
+ f"Rebuilt from dict should be binary (PyArrow default), but got
{rebuilt.schema.field('data').type}"
+ )
+ self.assertFalse(
+ pa_types.is_large_binary(rebuilt.schema.field('data').type),
+ "large_binary type should be lost after dict roundtrip"
+ )
+
+ def test_ray_data_read_and_write_with_blob(self):
+ import time
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('name', pa.string()),
+ ('data', pa.large_binary()), # Table uses large_binary for blob
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-field': 'data',
+ }
+ )
+
+ table_name = f'default.test_ray_read_write_blob_{int(time.time() *
1000000)}'
+ self.catalog.create_table(table_name, schema, False)
+ table = self.catalog.get_table(table_name)
+
+ # Step 1: Write data to Paimon table using write_arrow (large_binary
type)
+ initial_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['Alice', 'Bob', 'Charlie'],
+ 'data': [b'blob_data_1', b'blob_data_2', b'blob_data_3'],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(initial_data)
+ commit_messages = writer.prepare_commit()
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ # Step 2: Read from Paimon table using to_ray()
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+ table_scan = read_builder.new_scan()
+ splits = table_scan.plan().splits()
+
+ ray_dataset = table_read.to_ray(splits)
+
+ # Verify Ray blocks preserve large_binary type from Paimon
+ for batch in ray_dataset.iter_batches(batch_size=10,
batch_format="pyarrow"):
+ ray_data_field = batch.schema.field('data')
+ self.assertTrue(
+ pa_types.is_large_binary(ray_data_field.type),
+ f"Ray block should preserve large_binary() from Paimon, but
got {ray_data_field.type}"
+ )
+ break
+
+ # Verify Paimon table schema is large_binary (BLOB)
+ table_pa_schema =
PyarrowFieldParser.from_paimon_schema(table.table_schema.fields)
+ self.assertTrue(
+ pa_types.is_large_binary(table_pa_schema.field('data').type),
+ "Paimon table should have large_binary() for BLOB field"
+ )
+
+ # Step 3: Simulate user pipeline: map_batches returns Python dict,
+ def process_blob(batch):
+ return {
+ 'id': batch['id'].to_pylist(),
+ 'name': batch['name'].to_pylist(),
+ 'data': batch['data'].to_pylist(), # Python bytes -> binary
+ }
+
+ mapped_dataset = ray_dataset.map_batches(process_blob,
batch_format="pyarrow")
+
+ # Verify map_batches caused type downgrade: large_binary -> binary
+ for batch in mapped_dataset.iter_batches(batch_size=10,
batch_format="pyarrow"):
+ mapped_data_field = batch.schema.field('data')
+ self.assertTrue(
+ pa_types.is_binary(mapped_data_field.type),
+ f"After map_batches returning dict, data should be binary(),
but got {mapped_data_field.type}"
+ )
+ break
+
+ # Step 4: Write mapped dataset back via write_ray().
+ write_builder2 = table.new_batch_write_builder()
+ writer2 = write_builder2.new_write()
+
+ writer2.write_ray(
+ mapped_dataset,
+ overwrite=False,
+ concurrency=1
+ )
+ writer2.close()
+
+ # Step 5: Verify the data was written correctly
+ read_builder2 = table.new_read_builder()
+ table_read2 = read_builder2.new_read()
+ result = table_read2.to_arrow(read_builder2.new_scan().plan().splits())
+
+ self.assertEqual(result.num_rows, 6, "Table should have 6 rows after
roundtrip")
+
+ result_df = result.to_pandas()
+ result_df_sorted =
result_df.sort_values(by='id').reset_index(drop=True)
+
+ self.assertEqual(list(result_df_sorted['id']), [1, 1, 2, 2, 3, 3], "ID
column should match")
+ self.assertEqual(
+ list(result_df_sorted['name']),
+ ['Alice', 'Alice', 'Bob', 'Bob', 'Charlie', 'Charlie'],
+ "Name column should match"
+ )
+
+ written_data_values = [bytes(d) if d is not None else None for d in
result_df_sorted['data']]
+ self.assertEqual(
+ written_data_values,
+ [b'blob_data_1', b'blob_data_1', b'blob_data_2', b'blob_data_2',
b'blob_data_3', b'blob_data_3'],
+ "Blob data column should match"
+ )
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/write/ray_datasink.py
b/paimon-python/pypaimon/write/ray_datasink.py
index 196fb62cb1..a01387b327 100644
--- a/paimon-python/pypaimon/write/ray_datasink.py
+++ b/paimon-python/pypaimon/write/ray_datasink.py
@@ -36,6 +36,29 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+
+def _cast_binary_to_table_schema(table: pa.Table, target_schema: pa.Schema) ->
pa.Table:
+ """Cast binary to large_binary for BLOB fields.
+
+ When map_batches returns Python dicts, PyArrow infers bytes as binary,
+ losing the original large_binary (BLOB) type. Cast back before writing.
+ """
+ cast_indices = []
+ for i, field in enumerate(table.schema):
+ target_field = target_schema.field(field.name) if field.name in
target_schema.names else None
+ if target_field and pa.types.is_binary(field.type) and
pa.types.is_large_binary(target_field.type):
+ cast_indices.append(i)
+
+ if not cast_indices:
+ return table
+
+ columns = table.columns
+ for i in cast_indices:
+ columns[i] = columns[i].cast(pa.large_binary())
+ fields = [target_schema.field(f.name) if i in cast_indices else f
+ for i, f in enumerate(table.schema)]
+ return pa.table(columns, schema=pa.schema(fields))
+
# Python 3.8 / Ray 2.10: Datasink is not subscriptable at runtime
try:
_DatasinkBase = Datasink[List["CommitMessage"]]
@@ -90,12 +113,18 @@ class PaimonDatasink(_DatasinkBase):
table_write = writer_builder.new_write()
+ table_schema = self.table.table_schema
+ from pypaimon.schema.data_types import PyarrowFieldParser
+ target_pa_schema =
PyarrowFieldParser.from_paimon_schema(table_schema.fields)
+
for block in blocks:
block_arrow: pa.Table =
BlockAccessor.for_block(block).to_arrow()
if block_arrow.num_rows == 0:
continue
+ block_arrow = _cast_binary_to_table_schema(block_arrow,
target_pa_schema)
+
table_write.write_arrow(block_arrow)
commit_messages = table_write.prepare_commit()