This is an automated email from the ASF dual-hosted git repository.

XiaoHongbo-Hope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 26de5f7060 [python] Support null blob (#7847)
26de5f7060 is described below

commit 26de5f706040b519fc6430eb48a683e8129869d1
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu May 14 17:42:00 2026 +0800

    [python] Support null blob (#7847)
    
    ### Purpose
    
    ### Tests
---
 paimon-python/pypaimon/filesystem/local_file_io.py |  33 +-----
 .../pypaimon/filesystem/pyarrow_file_io.py         |  32 +-----
 .../pypaimon/read/reader/format_blob_reader.py     |  22 ++--
 paimon-python/pypaimon/tests/blob_test.py          | 126 +++++++++++++++++++--
 paimon-python/pypaimon/write/blob_format_writer.py |  43 ++++++-
 5 files changed, 173 insertions(+), 83 deletions(-)

diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py 
b/paimon-python/pypaimon/filesystem/local_file_io.py
index b26d540f57..120f2c6b3a 100644
--- a/paimon-python/pypaimon/filesystem/local_file_io.py
+++ b/paimon-python/pypaimon/filesystem/local_file_io.py
@@ -33,9 +33,6 @@ from pypaimon.common.options import Options
 from pypaimon.common.uri_reader import UriReaderFactory
 from pypaimon.filesystem.local import PaimonLocalFileSystem
 from pypaimon.schema.data_types import DataField, AtomicType, 
PyarrowFieldParser
-from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
-from pypaimon.table.row.generic_row import GenericRow
-from pypaimon.table.row.row_kind import RowKind
 from pypaimon.write.blob_format_writer import BlobFormatWriter
 
 
@@ -420,10 +417,6 @@ class LocalFileIO(FileIO):
             if data.num_columns != 1:
                 raise RuntimeError(f"Blob format only supports a single 
column, got {data.num_columns} columns")
             
-            column = data.column(0)
-            if column.null_count > 0:
-                raise RuntimeError("Blob format does not support null values")
-            
             field = data.schema[0]
             if pyarrow.types.is_large_binary(field.type):
                 fields = [DataField(0, field.name, AtomicType("BLOB"))]
@@ -443,31 +436,7 @@ class LocalFileIO(FileIO):
             with open(file_path, 'wb') as output_stream:
                 writer = BlobFormatWriter(output_stream)
                 for i in range(num_rows):
-                    col_data = records_dict[field_name][i]
-                    if hasattr(fields[0].type, 'type') and fields[0].type.type 
== "BLOB":
-                        if hasattr(col_data, 'as_py'):
-                            col_data = col_data.as_py()
-                        if isinstance(col_data, str):
-                            col_data = col_data.encode('utf-8')
-                        if isinstance(col_data, bytearray):
-                            col_data = bytes(col_data)
-
-                        if isinstance(col_data, bytes):
-                            if BlobDescriptor.is_blob_descriptor(col_data):
-                                descriptor = 
BlobDescriptor.deserialize(col_data)
-                                uri_reader = 
self.uri_reader_factory.create(descriptor.uri)
-                                blob_data = Blob.from_descriptor(uri_reader, 
descriptor)
-                            else:
-                                blob_data = BlobData(col_data)
-                        else:
-                            raise RuntimeError(
-                                "Blob field value must be bytes/blob or 
serialized BlobDescriptor bytes."
-                            )
-                        row_values = [blob_data]
-                    else:
-                        row_values = [col_data]
-                    row = GenericRow(row_values, fields, RowKind.INSERT)
-                    writer.add_element(row)
+                    writer.write_value(records_dict[field_name][i], fields, 
self.uri_reader_factory)
                 writer.close()
         except Exception as e:
             self.delete_quietly(path)
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py 
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index 828b97cab1..d81238c89f 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -38,9 +38,6 @@ from pypaimon.common.uri_reader import UriReaderFactory
 from pypaimon.filesystem.jindo_file_system_handler import 
JindoFileSystemHandler, JINDO_AVAILABLE
 from pypaimon.schema.data_types import (AtomicType, DataField,
                                         PyarrowFieldParser)
-from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor
-from pypaimon.table.row.generic_row import GenericRow
-from pypaimon.table.row.row_kind import RowKind
 from pypaimon.write.blob_format_writer import BlobFormatWriter
 
 
@@ -654,9 +651,6 @@ class PyArrowFileIO(FileIO):
         try:
             if data.num_columns != 1:
                 raise RuntimeError(f"Blob format only supports a single 
column, got {data.num_columns} columns")
-            column = data.column(0)
-            if column.null_count > 0:
-                raise RuntimeError("Blob format does not support null values")
             field = data.schema[0]
             if pyarrow.types.is_large_binary(field.type):
                 fields = [DataField(0, field.name, AtomicType("BLOB"))]
@@ -669,31 +663,7 @@ class PyArrowFileIO(FileIO):
             with self.new_output_stream(path) as output_stream:
                 writer = BlobFormatWriter(output_stream)
                 for i in range(num_rows):
-                    col_data = records_dict[field_name][i]
-                    if hasattr(fields[0].type, 'type') and fields[0].type.type 
== "BLOB":
-                        if hasattr(col_data, 'as_py'):
-                            col_data = col_data.as_py()
-                        if isinstance(col_data, str):
-                            col_data = col_data.encode('utf-8')
-                        if isinstance(col_data, bytearray):
-                            col_data = bytes(col_data)
-
-                        if isinstance(col_data, bytes):
-                            if BlobDescriptor.is_blob_descriptor(col_data):
-                                descriptor = 
BlobDescriptor.deserialize(col_data)
-                                uri_reader = 
self.uri_reader_factory.create(descriptor.uri)
-                                blob_data = Blob.from_descriptor(uri_reader, 
descriptor)
-                            else:
-                                blob_data = BlobData(col_data)
-                        else:
-                            raise RuntimeError(
-                                "Blob field value must be bytes/blob or 
serialized BlobDescriptor bytes."
-                            )
-                        row_values = [blob_data]
-                    else:
-                        row_values = [col_data]
-                    row = GenericRow(row_values, fields, RowKind.INSERT)
-                    writer.add_element(row)
+                    writer.write_value(records_dict[field_name][i], fields, 
self.uri_reader_factory)
                 writer.close()
 
         except Exception as e:
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py 
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index b74232a642..e7e65394aa 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -95,12 +95,12 @@ class FormatBlobReader(RecordBatchReader):
                     break
                 blob = blob_row.values[0]
                 for field_name in self._fields:
-                    blob_descriptor = blob.to_descriptor()
-                    if self._blob_as_descriptor:
-                        blob_data = blob_descriptor.serialize()
+                    if blob is None:
+                        pydict_data[field_name].append(None)
+                    elif self._blob_as_descriptor:
+                        
pydict_data[field_name].append(blob.to_descriptor().serialize())
                     else:
-                        blob_data = blob.to_data()
-                    pydict_data[field_name].append(blob_data)
+                        pydict_data[field_name].append(blob.to_data())
 
                 records_in_batch += 1
                 if records_in_batch >= read_size:
@@ -163,8 +163,11 @@ class FormatBlobReader(RecordBatchReader):
             blob_offsets = []
             offset = 0
             for length in blob_lengths:
-                blob_offsets.append(offset)
-                offset += length
+                if length == -1:
+                    blob_offsets.append(-1)
+                else:
+                    blob_offsets.append(offset)
+                    offset += length
             self.blob_lengths = blob_lengths
             self.blob_offsets = blob_offsets
 
@@ -188,13 +191,16 @@ class BlobRecordIterator:
     def __next__(self) -> GenericRow:
         if self.current_position >= len(self.blob_lengths):
             raise StopIteration
+        fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
+        if self.blob_lengths[self.current_position] == -1:
+            self.current_position += 1
+            return GenericRow([None], fields, RowKind.INSERT)
         # Create blob reference for the current blob
         # Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 
bytes) = 12 bytes
         blob_offset = self.blob_offsets[self.current_position] + 
self.MAGIC_NUMBER_SIZE  # Skip magic number
         blob_length = self.blob_lengths[self.current_position] - 
self.METADATA_OVERHEAD
         blob = Blob.from_file(self.file_io, self.file_path, blob_offset, 
blob_length)
         self.current_position += 1
-        fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
         return GenericRow([blob], fields, RowKind.INSERT)
 
     def returned_position(self) -> int:
diff --git a/paimon-python/pypaimon/tests/blob_test.py 
b/paimon-python/pypaimon/tests/blob_test.py
index c27bfe5290..d9cf64210d 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -745,17 +745,28 @@ class BlobEndToEndTest(unittest.TestCase):
             file_io.write_blob(multi_column_url, multi_column_table)
         self.assertIn("single column", str(context.exception))
 
-        # Test that FileIO.write_blob rejects null values
+        # Test that FileIO.write_blob supports null values and round-trips 
correctly
         null_schema = pa.schema([pa.field("blob_with_nulls", 
pa.large_binary())])
         null_table = pa.table([[b"data", None]], schema=null_schema)
 
         null_file = Path(self.temp_dir) / "null_data.blob"
         null_file_url = _to_url(null_file)
+        file_io.write_blob(null_file_url, null_table)
 
-        # Should throw RuntimeError for null values
-        with self.assertRaises(RuntimeError) as context:
-            file_io.write_blob(null_file_url, null_table)
-        self.assertIn("null values", str(context.exception))
+        null_read_fields = [DataField(0, "blob_with_nulls", 
AtomicType("BLOB"))]
+        null_reader = FormatBlobReader(
+            file_io=file_io,
+            file_path=str(null_file),
+            read_fields=["blob_with_nulls"],
+            full_fields=null_read_fields,
+            push_down_predicate=None,
+            blob_as_descriptor=False
+        )
+        null_batch = null_reader.read_arrow_batch()
+        self.assertEqual(null_batch.num_rows, 2)
+        self.assertEqual(null_batch.column(0)[0].as_py(), b"data")
+        self.assertIsNone(null_batch.column(0)[1].as_py())
+        null_reader.close()
 
         # ========== Test FormatBlobReader with complex type schema ==========
         # Create a valid blob file first
@@ -1027,17 +1038,29 @@ class BlobEndToEndTest(unittest.TestCase):
             "Field must be Blob/BlobData instance" in str(context.exception)
         )
 
-        # Test that blob format rejects tables with null values
+        # Test that blob format supports tables with null values (round-trip)
         null_schema = pa.schema([pa.field("blob_with_null", 
pa.large_binary())])
         null_table = pa.table([[b"data", None, b"more_data"]], 
schema=null_schema)
 
         null_file = Path(self.temp_dir) / "with_nulls.blob"
         null_file_url = _to_url(null_file)
+        file_io.write_blob(null_file_url, null_table)
 
-        # Should reject null values
-        with self.assertRaises(RuntimeError) as context:
-            file_io.write_blob(null_file_url, null_table)
-        self.assertIn("null values", str(context.exception))
+        null_read_fields = [DataField(0, "blob_with_null", AtomicType("BLOB"))]
+        null_reader = FormatBlobReader(
+            file_io=file_io,
+            file_path=str(null_file),
+            read_fields=["blob_with_null"],
+            full_fields=null_read_fields,
+            push_down_predicate=None,
+            blob_as_descriptor=False
+        )
+        null_batch = null_reader.read_arrow_batch()
+        self.assertEqual(null_batch.num_rows, 3)
+        self.assertEqual(null_batch.column(0)[0].as_py(), b"data")
+        self.assertIsNone(null_batch.column(0)[1].as_py())
+        self.assertEqual(null_batch.column(0)[2].as_py(), b"more_data")
+        null_reader.close()
 
     def test_blob_write_with_raw_bytes_starting_with_v1_prefix(self):
         file_io = LocalFileIO(self.temp_dir, Options({}))
@@ -1158,6 +1181,89 @@ class BlobEndToEndTest(unittest.TestCase):
         self.assertEqual(read_content_bytes, test_content)
         reader_content.close()
 
+    def test_null_blob_write(self):
+        from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+        output = io.BytesIO()
+        writer = BlobFormatWriter(output)
+
+        row_with_null = GenericRow(
+            [None],
+            [DataField(0, "blob_field", AtomicType("BLOB"))],
+            RowKind.INSERT
+        )
+        writer.add_element(row_with_null)
+        self.assertEqual(writer.lengths, [-1])
+        self.assertEqual(writer.position, 0)
+
+    def test_null_blob_read(self):
+        from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+        file_io = LocalFileIO(self.temp_dir, Options({}))
+        blob_file_path = os.path.join(self.temp_dir, "null_blob.blob")
+
+        output = open(blob_file_path, 'wb')
+        writer = BlobFormatWriter(output)
+        fields = [DataField(0, "blob_field", AtomicType("BLOB"))]
+        writer.add_element(GenericRow([BlobData(b"hello")], fields, 
RowKind.INSERT))
+        writer.add_element(GenericRow([None], fields, RowKind.INSERT))
+        writer.add_element(GenericRow([BlobData(b"world")], fields, 
RowKind.INSERT))
+        writer.close()
+
+        blob_field_name = "blob_field"
+        read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+        reader = FormatBlobReader(
+            file_io=file_io,
+            file_path=blob_file_path,
+            read_fields=[blob_field_name],
+            full_fields=read_fields,
+            push_down_predicate=None,
+            blob_as_descriptor=False
+        )
+
+        batch = reader.read_arrow_batch()
+        self.assertIsNotNone(batch)
+        self.assertEqual(batch.num_rows, 3)
+        self.assertEqual(batch.column(0)[0].as_py(), b"hello")
+        self.assertIsNone(batch.column(0)[1].as_py())
+        self.assertEqual(batch.column(0)[2].as_py(), b"world")
+        reader.close()
+
+    def test_null_blob_read_as_descriptor(self):
+        from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+        file_io = LocalFileIO(self.temp_dir, Options({}))
+        blob_file_path = os.path.join(self.temp_dir, "null_desc.blob")
+
+        output = open(blob_file_path, 'wb')
+        writer = BlobFormatWriter(output)
+        fields = [DataField(0, "blob_field", AtomicType("BLOB"))]
+        writer.add_element(GenericRow([BlobData(b"hello")], fields, 
RowKind.INSERT))
+        writer.add_element(GenericRow([None], fields, RowKind.INSERT))
+        writer.add_element(GenericRow([BlobData(b"world")], fields, 
RowKind.INSERT))
+        writer.close()
+
+        blob_field_name = "blob_field"
+        read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+        reader = FormatBlobReader(
+            file_io=file_io,
+            file_path=blob_file_path,
+            read_fields=[blob_field_name],
+            full_fields=read_fields,
+            push_down_predicate=None,
+            blob_as_descriptor=True
+        )
+
+        batch = reader.read_arrow_batch()
+        self.assertIsNotNone(batch)
+        self.assertEqual(batch.num_rows, 3)
+        desc0 = BlobDescriptor.deserialize(batch.column(0)[0].as_py())
+        self.assertEqual(desc0.uri, blob_file_path)
+        self.assertIsNone(batch.column(0)[1].as_py())
+        desc2 = BlobDescriptor.deserialize(batch.column(0)[2].as_py())
+        self.assertEqual(desc2.uri, blob_file_path)
+        reader.close()
+
 
 class OffsetInputStreamTest(unittest.TestCase):
 
diff --git a/paimon-python/pypaimon/write/blob_format_writer.py 
b/paimon-python/pypaimon/write/blob_format_writer.py
index 21da2dc3f6..f019655c84 100644
--- a/paimon-python/pypaimon/write/blob_format_writer.py
+++ b/paimon-python/pypaimon/write/blob_format_writer.py
@@ -19,7 +19,7 @@ import struct
 import zlib
 from typing import BinaryIO, List
 
-from pypaimon.table.row.blob import Blob, BlobData
+from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor
 from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
 
 
@@ -40,7 +40,8 @@ class BlobFormatWriter:
 
         blob_value = row.values[0]
         if blob_value is None:
-            raise ValueError("BlobFormatWriter only supports non-null blob")
+            self.lengths.append(-1)
+            return
 
         if not isinstance(blob_value, Blob):
             raise ValueError("Field must be Blob/BlobData instance")
@@ -87,6 +88,44 @@ class BlobFormatWriter:
         self.position += len(data)
         return crc32
 
+    def write_value(self, col_data, fields, uri_reader_factory=None) -> None:
+        from pypaimon.table.row.generic_row import GenericRow
+        from pypaimon.table.row.row_kind import RowKind
+
+        is_blob = hasattr(fields[0].type, 'type') and fields[0].type.type == 
"BLOB"
+
+        if col_data is None:
+            if not is_blob:
+                raise RuntimeError("Null values are only supported for BLOB 
type fields")
+            self.lengths.append(-1)
+            return
+
+        if is_blob:
+            if hasattr(col_data, 'as_py'):
+                col_data = col_data.as_py()
+            if isinstance(col_data, str):
+                col_data = col_data.encode('utf-8')
+            if isinstance(col_data, bytearray):
+                col_data = bytes(col_data)
+
+            if isinstance(col_data, bytes):
+                if BlobDescriptor.is_blob_descriptor(col_data):
+                    descriptor = BlobDescriptor.deserialize(col_data)
+                    uri_reader = uri_reader_factory.create(descriptor.uri)
+                    blob_value = Blob.from_descriptor(uri_reader, descriptor)
+                else:
+                    blob_value = BlobData(col_data)
+            else:
+                raise RuntimeError(
+                    "Blob field value must be bytes/blob or serialized 
BlobDescriptor bytes."
+                )
+            row_values = [blob_value]
+        else:
+            row_values = [col_data]
+
+        row = GenericRow(row_values, fields, RowKind.INSERT)
+        self.add_element(row)
+
     def reach_target_size(self, target_size: int) -> bool:
         return self.position >= target_size
 

Reply via email to