This is an automated email from the ASF dual-hosted git repository.
XiaoHongbo-Hope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 26de5f7060 [python] Support null blob (#7847)
26de5f7060 is described below
commit 26de5f706040b519fc6430eb48a683e8129869d1
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu May 14 17:42:00 2026 +0800
[python] Support null blob (#7847)
### Purpose
### Tests
---
paimon-python/pypaimon/filesystem/local_file_io.py | 33 +-----
.../pypaimon/filesystem/pyarrow_file_io.py | 32 +-----
.../pypaimon/read/reader/format_blob_reader.py | 22 ++--
paimon-python/pypaimon/tests/blob_test.py | 126 +++++++++++++++++++--
paimon-python/pypaimon/write/blob_format_writer.py | 43 ++++++-
5 files changed, 173 insertions(+), 83 deletions(-)
diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py
b/paimon-python/pypaimon/filesystem/local_file_io.py
index b26d540f57..120f2c6b3a 100644
--- a/paimon-python/pypaimon/filesystem/local_file_io.py
+++ b/paimon-python/pypaimon/filesystem/local_file_io.py
@@ -33,9 +33,6 @@ from pypaimon.common.options import Options
from pypaimon.common.uri_reader import UriReaderFactory
from pypaimon.filesystem.local import PaimonLocalFileSystem
from pypaimon.schema.data_types import DataField, AtomicType,
PyarrowFieldParser
-from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
-from pypaimon.table.row.generic_row import GenericRow
-from pypaimon.table.row.row_kind import RowKind
from pypaimon.write.blob_format_writer import BlobFormatWriter
@@ -420,10 +417,6 @@ class LocalFileIO(FileIO):
if data.num_columns != 1:
raise RuntimeError(f"Blob format only supports a single
column, got {data.num_columns} columns")
- column = data.column(0)
- if column.null_count > 0:
- raise RuntimeError("Blob format does not support null values")
-
field = data.schema[0]
if pyarrow.types.is_large_binary(field.type):
fields = [DataField(0, field.name, AtomicType("BLOB"))]
@@ -443,31 +436,7 @@ class LocalFileIO(FileIO):
with open(file_path, 'wb') as output_stream:
writer = BlobFormatWriter(output_stream)
for i in range(num_rows):
- col_data = records_dict[field_name][i]
- if hasattr(fields[0].type, 'type') and fields[0].type.type
== "BLOB":
- if hasattr(col_data, 'as_py'):
- col_data = col_data.as_py()
- if isinstance(col_data, str):
- col_data = col_data.encode('utf-8')
- if isinstance(col_data, bytearray):
- col_data = bytes(col_data)
-
- if isinstance(col_data, bytes):
- if BlobDescriptor.is_blob_descriptor(col_data):
- descriptor =
BlobDescriptor.deserialize(col_data)
- uri_reader =
self.uri_reader_factory.create(descriptor.uri)
- blob_data = Blob.from_descriptor(uri_reader,
descriptor)
- else:
- blob_data = BlobData(col_data)
- else:
- raise RuntimeError(
- "Blob field value must be bytes/blob or
serialized BlobDescriptor bytes."
- )
- row_values = [blob_data]
- else:
- row_values = [col_data]
- row = GenericRow(row_values, fields, RowKind.INSERT)
- writer.add_element(row)
+ writer.write_value(records_dict[field_name][i], fields,
self.uri_reader_factory)
writer.close()
except Exception as e:
self.delete_quietly(path)
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index 828b97cab1..d81238c89f 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -38,9 +38,6 @@ from pypaimon.common.uri_reader import UriReaderFactory
from pypaimon.filesystem.jindo_file_system_handler import
JindoFileSystemHandler, JINDO_AVAILABLE
from pypaimon.schema.data_types import (AtomicType, DataField,
PyarrowFieldParser)
-from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor
-from pypaimon.table.row.generic_row import GenericRow
-from pypaimon.table.row.row_kind import RowKind
from pypaimon.write.blob_format_writer import BlobFormatWriter
@@ -654,9 +651,6 @@ class PyArrowFileIO(FileIO):
try:
if data.num_columns != 1:
raise RuntimeError(f"Blob format only supports a single
column, got {data.num_columns} columns")
- column = data.column(0)
- if column.null_count > 0:
- raise RuntimeError("Blob format does not support null values")
field = data.schema[0]
if pyarrow.types.is_large_binary(field.type):
fields = [DataField(0, field.name, AtomicType("BLOB"))]
@@ -669,31 +663,7 @@ class PyArrowFileIO(FileIO):
with self.new_output_stream(path) as output_stream:
writer = BlobFormatWriter(output_stream)
for i in range(num_rows):
- col_data = records_dict[field_name][i]
- if hasattr(fields[0].type, 'type') and fields[0].type.type
== "BLOB":
- if hasattr(col_data, 'as_py'):
- col_data = col_data.as_py()
- if isinstance(col_data, str):
- col_data = col_data.encode('utf-8')
- if isinstance(col_data, bytearray):
- col_data = bytes(col_data)
-
- if isinstance(col_data, bytes):
- if BlobDescriptor.is_blob_descriptor(col_data):
- descriptor =
BlobDescriptor.deserialize(col_data)
- uri_reader =
self.uri_reader_factory.create(descriptor.uri)
- blob_data = Blob.from_descriptor(uri_reader,
descriptor)
- else:
- blob_data = BlobData(col_data)
- else:
- raise RuntimeError(
- "Blob field value must be bytes/blob or
serialized BlobDescriptor bytes."
- )
- row_values = [blob_data]
- else:
- row_values = [col_data]
- row = GenericRow(row_values, fields, RowKind.INSERT)
- writer.add_element(row)
+ writer.write_value(records_dict[field_name][i], fields,
self.uri_reader_factory)
writer.close()
except Exception as e:
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index b74232a642..e7e65394aa 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -95,12 +95,12 @@ class FormatBlobReader(RecordBatchReader):
break
blob = blob_row.values[0]
for field_name in self._fields:
- blob_descriptor = blob.to_descriptor()
- if self._blob_as_descriptor:
- blob_data = blob_descriptor.serialize()
+ if blob is None:
+ pydict_data[field_name].append(None)
+ elif self._blob_as_descriptor:
+
pydict_data[field_name].append(blob.to_descriptor().serialize())
else:
- blob_data = blob.to_data()
- pydict_data[field_name].append(blob_data)
+ pydict_data[field_name].append(blob.to_data())
records_in_batch += 1
if records_in_batch >= read_size:
@@ -163,8 +163,11 @@ class FormatBlobReader(RecordBatchReader):
blob_offsets = []
offset = 0
for length in blob_lengths:
- blob_offsets.append(offset)
- offset += length
+ if length == -1:
+ blob_offsets.append(-1)
+ else:
+ blob_offsets.append(offset)
+ offset += length
self.blob_lengths = blob_lengths
self.blob_offsets = blob_offsets
@@ -188,13 +191,16 @@ class BlobRecordIterator:
def __next__(self) -> GenericRow:
if self.current_position >= len(self.blob_lengths):
raise StopIteration
+ fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
+ if self.blob_lengths[self.current_position] == -1:
+ self.current_position += 1
+ return GenericRow([None], fields, RowKind.INSERT)
# Create blob reference for the current blob
# Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4
bytes) = 12 bytes
blob_offset = self.blob_offsets[self.current_position] +
self.MAGIC_NUMBER_SIZE # Skip magic number
blob_length = self.blob_lengths[self.current_position] -
self.METADATA_OVERHEAD
blob = Blob.from_file(self.file_io, self.file_path, blob_offset,
blob_length)
self.current_position += 1
- fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
return GenericRow([blob], fields, RowKind.INSERT)
def returned_position(self) -> int:
diff --git a/paimon-python/pypaimon/tests/blob_test.py
b/paimon-python/pypaimon/tests/blob_test.py
index c27bfe5290..d9cf64210d 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -745,17 +745,28 @@ class BlobEndToEndTest(unittest.TestCase):
file_io.write_blob(multi_column_url, multi_column_table)
self.assertIn("single column", str(context.exception))
- # Test that FileIO.write_blob rejects null values
+ # Test that FileIO.write_blob supports null values and round-trips
correctly
null_schema = pa.schema([pa.field("blob_with_nulls",
pa.large_binary())])
null_table = pa.table([[b"data", None]], schema=null_schema)
null_file = Path(self.temp_dir) / "null_data.blob"
null_file_url = _to_url(null_file)
+ file_io.write_blob(null_file_url, null_table)
- # Should throw RuntimeError for null values
- with self.assertRaises(RuntimeError) as context:
- file_io.write_blob(null_file_url, null_table)
- self.assertIn("null values", str(context.exception))
+ null_read_fields = [DataField(0, "blob_with_nulls",
AtomicType("BLOB"))]
+ null_reader = FormatBlobReader(
+ file_io=file_io,
+ file_path=str(null_file),
+ read_fields=["blob_with_nulls"],
+ full_fields=null_read_fields,
+ push_down_predicate=None,
+ blob_as_descriptor=False
+ )
+ null_batch = null_reader.read_arrow_batch()
+ self.assertEqual(null_batch.num_rows, 2)
+ self.assertEqual(null_batch.column(0)[0].as_py(), b"data")
+ self.assertIsNone(null_batch.column(0)[1].as_py())
+ null_reader.close()
# ========== Test FormatBlobReader with complex type schema ==========
# Create a valid blob file first
@@ -1027,17 +1038,29 @@ class BlobEndToEndTest(unittest.TestCase):
"Field must be Blob/BlobData instance" in str(context.exception)
)
- # Test that blob format rejects tables with null values
+ # Test that blob format supports tables with null values (round-trip)
null_schema = pa.schema([pa.field("blob_with_null",
pa.large_binary())])
null_table = pa.table([[b"data", None, b"more_data"]],
schema=null_schema)
null_file = Path(self.temp_dir) / "with_nulls.blob"
null_file_url = _to_url(null_file)
+ file_io.write_blob(null_file_url, null_table)
- # Should reject null values
- with self.assertRaises(RuntimeError) as context:
- file_io.write_blob(null_file_url, null_table)
- self.assertIn("null values", str(context.exception))
+ null_read_fields = [DataField(0, "blob_with_null", AtomicType("BLOB"))]
+ null_reader = FormatBlobReader(
+ file_io=file_io,
+ file_path=str(null_file),
+ read_fields=["blob_with_null"],
+ full_fields=null_read_fields,
+ push_down_predicate=None,
+ blob_as_descriptor=False
+ )
+ null_batch = null_reader.read_arrow_batch()
+ self.assertEqual(null_batch.num_rows, 3)
+ self.assertEqual(null_batch.column(0)[0].as_py(), b"data")
+ self.assertIsNone(null_batch.column(0)[1].as_py())
+ self.assertEqual(null_batch.column(0)[2].as_py(), b"more_data")
+ null_reader.close()
def test_blob_write_with_raw_bytes_starting_with_v1_prefix(self):
file_io = LocalFileIO(self.temp_dir, Options({}))
@@ -1158,6 +1181,89 @@ class BlobEndToEndTest(unittest.TestCase):
self.assertEqual(read_content_bytes, test_content)
reader_content.close()
+ def test_null_blob_write(self):
+ from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+ output = io.BytesIO()
+ writer = BlobFormatWriter(output)
+
+ row_with_null = GenericRow(
+ [None],
+ [DataField(0, "blob_field", AtomicType("BLOB"))],
+ RowKind.INSERT
+ )
+ writer.add_element(row_with_null)
+ self.assertEqual(writer.lengths, [-1])
+ self.assertEqual(writer.position, 0)
+
+ def test_null_blob_read(self):
+ from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+ file_io = LocalFileIO(self.temp_dir, Options({}))
+ blob_file_path = os.path.join(self.temp_dir, "null_blob.blob")
+
+ output = open(blob_file_path, 'wb')
+ writer = BlobFormatWriter(output)
+ fields = [DataField(0, "blob_field", AtomicType("BLOB"))]
+ writer.add_element(GenericRow([BlobData(b"hello")], fields,
RowKind.INSERT))
+ writer.add_element(GenericRow([None], fields, RowKind.INSERT))
+ writer.add_element(GenericRow([BlobData(b"world")], fields,
RowKind.INSERT))
+ writer.close()
+
+ blob_field_name = "blob_field"
+ read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+ reader = FormatBlobReader(
+ file_io=file_io,
+ file_path=blob_file_path,
+ read_fields=[blob_field_name],
+ full_fields=read_fields,
+ push_down_predicate=None,
+ blob_as_descriptor=False
+ )
+
+ batch = reader.read_arrow_batch()
+ self.assertIsNotNone(batch)
+ self.assertEqual(batch.num_rows, 3)
+ self.assertEqual(batch.column(0)[0].as_py(), b"hello")
+ self.assertIsNone(batch.column(0)[1].as_py())
+ self.assertEqual(batch.column(0)[2].as_py(), b"world")
+ reader.close()
+
+ def test_null_blob_read_as_descriptor(self):
+ from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+ file_io = LocalFileIO(self.temp_dir, Options({}))
+ blob_file_path = os.path.join(self.temp_dir, "null_desc.blob")
+
+ output = open(blob_file_path, 'wb')
+ writer = BlobFormatWriter(output)
+ fields = [DataField(0, "blob_field", AtomicType("BLOB"))]
+ writer.add_element(GenericRow([BlobData(b"hello")], fields,
RowKind.INSERT))
+ writer.add_element(GenericRow([None], fields, RowKind.INSERT))
+ writer.add_element(GenericRow([BlobData(b"world")], fields,
RowKind.INSERT))
+ writer.close()
+
+ blob_field_name = "blob_field"
+ read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+ reader = FormatBlobReader(
+ file_io=file_io,
+ file_path=blob_file_path,
+ read_fields=[blob_field_name],
+ full_fields=read_fields,
+ push_down_predicate=None,
+ blob_as_descriptor=True
+ )
+
+ batch = reader.read_arrow_batch()
+ self.assertIsNotNone(batch)
+ self.assertEqual(batch.num_rows, 3)
+ desc0 = BlobDescriptor.deserialize(batch.column(0)[0].as_py())
+ self.assertEqual(desc0.uri, blob_file_path)
+ self.assertIsNone(batch.column(0)[1].as_py())
+ desc2 = BlobDescriptor.deserialize(batch.column(0)[2].as_py())
+ self.assertEqual(desc2.uri, blob_file_path)
+ reader.close()
+
class OffsetInputStreamTest(unittest.TestCase):
diff --git a/paimon-python/pypaimon/write/blob_format_writer.py
b/paimon-python/pypaimon/write/blob_format_writer.py
index 21da2dc3f6..f019655c84 100644
--- a/paimon-python/pypaimon/write/blob_format_writer.py
+++ b/paimon-python/pypaimon/write/blob_format_writer.py
@@ -19,7 +19,7 @@ import struct
import zlib
from typing import BinaryIO, List
-from pypaimon.table.row.blob import Blob, BlobData
+from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor
from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
@@ -40,7 +40,8 @@ class BlobFormatWriter:
blob_value = row.values[0]
if blob_value is None:
- raise ValueError("BlobFormatWriter only supports non-null blob")
+ self.lengths.append(-1)
+ return
if not isinstance(blob_value, Blob):
raise ValueError("Field must be Blob/BlobData instance")
@@ -87,6 +88,44 @@ class BlobFormatWriter:
self.position += len(data)
return crc32
+ def write_value(self, col_data, fields, uri_reader_factory=None) -> None:
+ from pypaimon.table.row.generic_row import GenericRow
+ from pypaimon.table.row.row_kind import RowKind
+
+ is_blob = hasattr(fields[0].type, 'type') and fields[0].type.type ==
"BLOB"
+
+ if col_data is None:
+ if not is_blob:
+ raise RuntimeError("Null values are only supported for BLOB
type fields")
+ self.lengths.append(-1)
+ return
+
+ if is_blob:
+ if hasattr(col_data, 'as_py'):
+ col_data = col_data.as_py()
+ if isinstance(col_data, str):
+ col_data = col_data.encode('utf-8')
+ if isinstance(col_data, bytearray):
+ col_data = bytes(col_data)
+
+ if isinstance(col_data, bytes):
+ if BlobDescriptor.is_blob_descriptor(col_data):
+ descriptor = BlobDescriptor.deserialize(col_data)
+ uri_reader = uri_reader_factory.create(descriptor.uri)
+ blob_value = Blob.from_descriptor(uri_reader, descriptor)
+ else:
+ blob_value = BlobData(col_data)
+ else:
+ raise RuntimeError(
+ "Blob field value must be bytes/blob or serialized
BlobDescriptor bytes."
+ )
+ row_values = [blob_value]
+ else:
+ row_values = [col_data]
+
+ row = GenericRow(row_values, fields, RowKind.INSERT)
+ self.add_element(row)
+
def reach_target_size(self, target_size: int) -> bool:
return self.position >= target_size