This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 52a362ce1a [python] Support blob-external-storage-path for writing
BLOB data to external storage (#7967)
52a362ce1a is described below
commit 52a362ce1ab07c66fe0b30a0d6bee4912f476c47
Author: zhoulii <[email protected]>
AuthorDate: Tue May 26 18:12:05 2026 +0800
[python] Support blob-external-storage-path for writing BLOB data to
external storage (#7967)
---
.../pypaimon/common/options/core_options.py | 40 ++-
paimon-python/pypaimon/schema/schema_manager.py | 64 +++-
.../pypaimon/tests/external_storage_blob_test.py | 392 +++++++++++++++++++++
.../pypaimon/write/writer/data_blob_writer.py | 30 +-
.../write/writer/external_storage_blob_writer.py | 228 ++++++++++++
5 files changed, 738 insertions(+), 16 deletions(-)
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index 5bc7848e1e..b2f0e01916 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -16,15 +16,14 @@
# under the License.
import sys
+from datetime import timedelta
from enum import Enum
from typing import Dict, Optional
-from datetime import timedelta
-
from pypaimon.common.memory_size import MemorySize
from pypaimon.common.options import Options
-from pypaimon.common.options.config_options import ConfigOptions
from pypaimon.common.options.config_option import ConfigOption
+from pypaimon.common.options.config_options import ConfigOptions
class ExternalPathStrategy(str, Enum):
@@ -220,6 +219,28 @@ class CoreOptions:
)
)
+ BLOB_EXTERNAL_STORAGE_PATH: ConfigOption[str] = (
+ ConfigOptions.key("blob-external-storage-path")
+ .string_type()
+ .no_default_value()
+ .with_description(
+ "The external storage path where raw BLOB data from fields
configured "
+ "by 'blob-external-storage-field' is written at write time. "
+ "Orphan file cleanup is not applied to this path."
+ )
+ )
+
+ BLOB_EXTERNAL_STORAGE_FIELD: ConfigOption[str] = (
+ ConfigOptions.key("blob-external-storage-field")
+ .string_type()
+ .no_default_value()
+ .with_description(
+ "Comma-separated BLOB field names (must be a subset of
'blob-descriptor-field') "
+ "whose raw data will be written to external storage at write time.
"
+ "The external storage path is configured via
'blob-external-storage-path'."
+ )
+ )
+
TARGET_FILE_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("target-file-size")
.memory_type()
@@ -648,6 +669,19 @@ class CoreOptions:
return {str(field).strip() for field in value if
str(field).strip()}
return set()
+ def blob_external_storage_fields(self, default=None):
+ value = self.options.get(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD,
default)
+ if value is None:
+ return set()
+ if isinstance(value, str):
+ return {field.strip() for field in value.split(",") if
field.strip()}
+ if isinstance(value, (list, set, tuple)):
+ return {str(field).strip() for field in value if
str(field).strip()}
+ return set()
+
+ def blob_external_storage_path(self, default=None):
+ return self.options.get(CoreOptions.BLOB_EXTERNAL_STORAGE_PATH,
default)
+
def target_file_size(self, has_primary_key, default=None):
return self.options.get(CoreOptions.TARGET_FILE_SIZE,
MemorySize.of_mebi_bytes(
diff --git a/paimon-python/pypaimon/schema/schema_manager.py
b/paimon-python/pypaimon/schema/schema_manager.py
index 3880bcd15c..1a01b31295 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -15,21 +15,22 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Optional, List
+from typing import List, Optional
-from pypaimon.common.identifier import DEFAULT_MAIN_BRANCH
-from pypaimon.catalog.catalog_exception import ColumnAlreadyExistException,
ColumnNotExistException
+from pypaimon.catalog.catalog_exception import (ColumnAlreadyExistException,
+ ColumnNotExistException)
from pypaimon.common.file_io import FileIO
+from pypaimon.common.identifier import DEFAULT_MAIN_BRANCH
from pypaimon.common.json_util import JSON
-from pypaimon.common.options import Options, CoreOptions
+from pypaimon.common.options import CoreOptions, Options
from pypaimon.schema.data_types import AtomicInteger, DataField
from pypaimon.schema.schema import Schema
-from pypaimon.schema.schema_change import (
- AddColumn, DropColumn, RemoveOption, RenameColumn,
- SchemaChange, SetOption, UpdateColumnComment,
- UpdateColumnNullability, UpdateColumnPosition,
- UpdateColumnType, UpdateComment
-)
+from pypaimon.schema.schema_change import (AddColumn, DropColumn, RemoveOption,
+ RenameColumn, SchemaChange,
+ SetOption, UpdateColumnComment,
+ UpdateColumnNullability,
+ UpdateColumnPosition,
+ UpdateColumnType, UpdateComment)
from pypaimon.schema.table_schema import TableSchema
@@ -148,6 +149,48 @@ def _assert_not_renaming_blob_column(
)
+def _validate_blob_external_storage_fields(fields: List[DataField], options:
dict):
+ """Validate blob-external-storage-field configuration.
+
+ Validation order aligned with Java's
SchemaValidation.validateBlobExternalStorageFields():
+ 1. Field must be a BLOB type in the schema
+ 2. Field must be in blob-descriptor-field
+ 3. blob-external-storage-path must be configured
+ """
+ core_options = CoreOptions(Options(options))
+ external_fields = core_options.blob_external_storage_fields()
+ if not external_fields:
+ return
+
+ # 1. Configured fields must be BLOB type
+ field_type_map = {f.name: f.type for f in fields}
+ for field_name in external_fields:
+ field_type = field_type_map.get(field_name)
+ if field_type is None or getattr(field_type, 'type', None) != 'BLOB':
+ raise ValueError(
+ f"Field '{field_name}' in "
+ f"'{CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()}' must be a
BLOB type field."
+ )
+
+ # 2. Must be a subset of blob-descriptor-field
+ descriptor_fields = core_options.blob_descriptor_fields()
+ not_in_descriptor = external_fields - descriptor_fields
+ if not_in_descriptor:
+ raise ValueError(
+ f"Fields {sorted(not_in_descriptor)} in "
+ f"'{CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()}' must also be
configured in "
+ f"'{CoreOptions.BLOB_DESCRIPTOR_FIELD.key()}'."
+ )
+
+ # 3. Must configure external-storage-path
+ external_path = core_options.blob_external_storage_path()
+ if not external_path:
+ raise ValueError(
+ f"'{CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()}' is configured
but "
+ f"'{CoreOptions.BLOB_EXTERNAL_STORAGE_PATH.key()}' is not set."
+ )
+
+
def _handle_rename_column(change: RenameColumn, new_fields: List[DataField]):
field_name = change.field_names[-1]
new_name = change.new_name
@@ -279,6 +322,7 @@ class SchemaManager:
if latest is not None:
raise RuntimeError("Schema in filesystem exists, creation is
not allowed.")
+ _validate_blob_external_storage_fields(schema.fields,
schema.options)
table_schema = TableSchema.from_schema(schema_id=0, schema=schema)
success = self.commit(table_schema)
if success:
diff --git a/paimon-python/pypaimon/tests/external_storage_blob_test.py
b/paimon-python/pypaimon/tests/external_storage_blob_test.py
new file mode 100644
index 0000000000..505e4407a9
--- /dev/null
+++ b/paimon-python/pypaimon/tests/external_storage_blob_test.py
@@ -0,0 +1,392 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class ExternalStorageBlobValidationTest(unittest.TestCase):
+ """Tests for blob-external-storage-field schema validation."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.temp_dir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.temp_dir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('test_db', False)
+
+ @classmethod
+ def tearDownClass(cls):
+ try:
+ shutil.rmtree(cls.temp_dir)
+ except OSError:
+ pass
+
+ def test_validation_missing_path(self):
+ """blob-external-storage-field configured without path should raise
error."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('video', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-descriptor-field': 'video',
+ 'blob-external-storage-field': 'video',
+ # Missing blob-external-storage-path
+ })
+ with self.assertRaises(ValueError) as ctx:
+ self.catalog.create_table('test_db.missing_path_test', schema,
False)
+ self.assertIn('blob-external-storage-path', str(ctx.exception))
+
+ def test_validation_field_not_in_descriptor_field(self):
+ """blob-external-storage-field must be a subset of
blob-descriptor-field."""
+ external_path = os.path.join(self.temp_dir, 'external_storage')
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('video', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ # NOT setting blob-descriptor-field
+ 'blob-external-storage-field': 'video',
+ 'blob-external-storage-path': external_path,
+ })
+ with self.assertRaises(ValueError) as ctx:
+ self.catalog.create_table('test_db.not_in_descriptor_test',
schema, False)
+ self.assertIn('blob-descriptor-field', str(ctx.exception))
+
+ def test_validation_field_not_blob_type(self):
+ """blob-external-storage-field must reference BLOB type fields."""
+ external_path = os.path.join(self.temp_dir, 'external_storage')
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('video', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-descriptor-field': 'name,video',
+ 'blob-external-storage-field': 'name',
+ 'blob-external-storage-path': external_path,
+ })
+ with self.assertRaises(ValueError) as ctx:
+ self.catalog.create_table('test_db.not_blob_type_test', schema,
False)
+ self.assertIn('must be a BLOB type field', str(ctx.exception))
+
+ def test_validation_blob_not_null_field_passes(self):
+ """BLOB NOT NULL fields should pass validation (not be rejected by str
comparison)."""
+ from pypaimon.schema.data_types import AtomicType, DataField
+
+ external_path = os.path.join(self.temp_dir, 'external_storage')
+ schema = Schema(
+ fields=[
+ DataField(0, 'id', AtomicType('INT', nullable=False)),
+ DataField(1, 'video', AtomicType('BLOB', nullable=False)),
+ ],
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-descriptor-field': 'video',
+ 'blob-external-storage-field': 'video',
+ 'blob-external-storage-path': external_path,
+ },
+ )
+ # Should NOT raise - BLOB NOT NULL is still a BLOB type
+ self.catalog.create_table('test_db.blob_not_null_test', schema, False)
+ table = self.catalog.get_table('test_db.blob_not_null_test')
+ self.assertIsNotNone(table)
+
+
+class ExternalStorageBlobWriteTest(unittest.TestCase):
+ """Tests for blob external storage write functionality."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.temp_dir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.temp_dir, 'warehouse')
+ cls.external_path = os.path.join(cls.temp_dir, 'external_storage')
+ os.makedirs(cls.external_path, exist_ok=True)
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('test_db', False)
+
+ @classmethod
+ def tearDownClass(cls):
+ try:
+ shutil.rmtree(cls.temp_dir)
+ except OSError:
+ pass
+
+ def _create_external_storage_table(self, table_name, extra_options=None):
+ """Helper to create a table with external storage configured."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('video', pa.large_binary()),
+ ])
+ options = {
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-descriptor-field': 'video',
+ 'blob-external-storage-field': 'video',
+ 'blob-external-storage-path': self.external_path,
+ }
+ if extra_options:
+ options.update(extra_options)
+ schema = Schema.from_pyarrow_schema(pa_schema, options=options)
+ self.catalog.create_table(f'test_db.{table_name}', schema, False)
+ return self.catalog.get_table(f'test_db.{table_name}')
+
+ def test_external_storage_basic_write(self):
+ """Basic write: raw blob data should be written to external storage as
.blob files."""
+ table = self._create_external_storage_table('basic_write_test')
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('video', pa.large_binary()),
+ ])
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['a', 'b', 'c'],
+ 'video': [b'video_data_1', b'video_data_2', b'video_data_3'],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ writer.close()
+
+ # Commit should succeed
+ self.assertGreater(len(commit_messages), 0)
+ write_builder.new_commit().commit(commit_messages)
+
+ # Verify external storage files were created
+ external_files = []
+ for root, dirs, files in os.walk(self.external_path):
+ for f in files:
+ if f.endswith('.blob'):
+ external_files.append(os.path.join(root, f))
+ self.assertGreater(len(external_files), 0, "External blob files should
be created")
+
+ def test_external_storage_roundtrip(self):
+ """Write raw blob data via external storage, read back should return
original data."""
+ table = self._create_external_storage_table('roundtrip_test')
+
+ video_bytes = b'hello_world_video_content'
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('video', pa.large_binary()),
+ ])
+ test_data = pa.Table.from_pydict({
+ 'id': [1],
+ 'name': ['test'],
+ 'video': [video_bytes],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ writer.close()
+ write_builder.new_commit().commit(commit_messages)
+
+ # Read back - reader resolves BlobDescriptor and returns original data
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+
+ self.assertEqual(result.num_rows, 1)
+ read_back = result.column('video')[0].as_py()
+ self.assertEqual(read_back, video_bytes)
+
+ def test_external_storage_multiple_fields(self):
+ """Multiple external storage fields should each write to separate blob
files."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('video', pa.large_binary()),
+ ('audio', pa.large_binary()),
+ ])
+ options = {
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-descriptor-field': 'video,audio',
+ 'blob-external-storage-field': 'video,audio',
+ 'blob-external-storage-path': self.external_path,
+ }
+ schema = Schema.from_pyarrow_schema(pa_schema, options=options)
+ self.catalog.create_table('test_db.multi_field_test', schema, False)
+ table = self.catalog.get_table('test_db.multi_field_test')
+
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'video': [b'video1', b'video2'],
+ 'audio': [b'audio1', b'audio2'],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ writer.close()
+ write_builder.new_commit().commit(commit_messages)
+
+ # Read back and verify data round-trips correctly
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+
+ self.assertEqual(result.num_rows, 2)
+ videos = result.column('video').to_pylist()
+ audios = result.column('audio').to_pylist()
+ self.assertEqual(set(videos), {b'video1', b'video2'})
+ self.assertEqual(set(audios), {b'audio1', b'audio2'})
+
+ def test_external_storage_mixed_with_normal_blob(self):
+ """External storage field + normal blob field should coexist."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('video', pa.large_binary()), # external storage
+ ('thumbnail', pa.large_binary()), # normal blob (written to
.blob files)
+ ])
+ options = {
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-descriptor-field': 'video',
+ 'blob-external-storage-field': 'video',
+ 'blob-external-storage-path': self.external_path,
+ }
+ schema = Schema.from_pyarrow_schema(pa_schema, options=options)
+ self.catalog.create_table('test_db.mixed_blob_test', schema, False)
+ table = self.catalog.get_table('test_db.mixed_blob_test')
+
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'video': [b'big_video_data', b'another_video'],
+ 'thumbnail': [b'thumb1', b'thumb2'],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ writer.close()
+ write_builder.new_commit().commit(commit_messages)
+
+ # Read back and verify both external storage and normal blob data
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+
+ self.assertEqual(result.num_rows, 2)
+ videos = set(result.column('video').to_pylist())
+ thumbnails = set(result.column('thumbnail').to_pylist())
+ self.assertEqual(videos, {b'big_video_data', b'another_video'})
+ self.assertEqual(thumbnails, {b'thumb1', b'thumb2'})
+
+ def test_external_storage_null_values(self):
+ """Null blob values should remain null (not written to external
storage)."""
+ table = self._create_external_storage_table('null_test')
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('video', pa.large_binary()),
+ ])
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['a', 'b', 'c'],
+ 'video': [b'data', None, b'more_data'],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ writer.close()
+ write_builder.new_commit().commit(commit_messages)
+
+ # Read back and verify nulls are preserved
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+
+ self.assertEqual(result.num_rows, 3)
+ # Build id → video mapping to avoid relying on row order
+ id_to_video = {
+ result.column('id')[i].as_py(): result.column('video')[i].as_py()
+ for i in range(result.num_rows)
+ }
+ self.assertEqual(id_to_video[1], b'data')
+ self.assertIsNone(id_to_video[2])
+ self.assertEqual(id_to_video[3], b'more_data')
+
+ def test_external_storage_with_descriptor_input(self):
+ """When input is serialized BlobDescriptor bytes, the writer should
read
+ the source data via BlobRef and re-write it to external storage."""
+ from pypaimon.table.row.blob import BlobDescriptor
+
+ table = self._create_external_storage_table('descriptor_input_test')
+
+ # Create a source file with known raw content
+ source_data = b'original_video_from_descriptor'
+ source_file = os.path.join(self.external_path, 'source.bin')
+ with open(source_file, 'wb') as f:
+ f.write(source_data)
+
+ # Construct a BlobDescriptor pointing to the source file
+ descriptor = BlobDescriptor(source_file, 0, len(source_data))
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('video', pa.large_binary()),
+ ])
+ test_data = pa.Table.from_pydict({
+ 'id': [1],
+ 'name': ['desc_test'],
+ 'video': [descriptor.serialize()],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ writer.close()
+ write_builder.new_commit().commit(commit_messages)
+
+ # Read back and verify the original data round-trips correctly
+ read_builder = table.new_read_builder()
+ splits = read_builder.new_scan().plan().splits()
+ result = read_builder.new_read().to_arrow(splits)
+
+ self.assertEqual(result.num_rows, 1)
+ self.assertEqual(result.column('video')[0].as_py(), source_data)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index d04cfaf550..4c3289f5aa 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -19,11 +19,10 @@ import logging
import uuid
from typing import Dict, List, Optional, Tuple
-
import pyarrow as pa
-from pypaimon.data.timestamp import Timestamp
from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.data.timestamp import Timestamp
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.table.row.generic_row import GenericRow
@@ -144,12 +143,28 @@ class DataBlobWriter(DataWriter):
options=options
)
+ # Initialize ExternalStorageBlobWriter if configured
+ self._external_storage_writer = None
+ external_storage_fields = self.options.blob_external_storage_fields()
+ external_storage_path = self.options.blob_external_storage_path()
+ if external_storage_fields and external_storage_path:
+ from pypaimon.write.writer.external_storage_blob_writer import \
+ ExternalStorageBlobWriter
+ self._external_storage_writer = ExternalStorageBlobWriter(
+ file_io=self.file_io,
+ external_storage_path=external_storage_path,
+ external_storage_fields=external_storage_fields,
+ blob_target_file_size=self.options.blob_target_file_size(),
+ data_file_prefix=CoreOptions.data_file_prefix(self.options),
+ )
+
logger.info(
"Initialized DataBlobWriter with blob columns: %s, blob file
columns: %s, descriptor "
- "stored columns: %s",
+ "stored columns: %s, external storage fields: %s",
self.blob_column_names,
self.blob_file_column_names,
sorted(self.blob_descriptor_fields),
+ sorted(external_storage_fields) if external_storage_fields else [],
)
def _get_blob_columns_from_schema(self) -> List[str]:
@@ -172,6 +187,11 @@ class DataBlobWriter(DataWriter):
def write(self, data: pa.RecordBatch):
try:
+ # Transform external-storage fields: write raw blob to external
storage,
+ # replace with serialized BlobDescriptor
+ if self._external_storage_writer:
+ data = self._external_storage_writer.transform_batch(data)
+
# Split data into normal and blob parts
normal_data, blob_data_map = self._split_data(data)
self._validate_descriptor_stored_fields_input(data)
@@ -213,6 +233,8 @@ class DataBlobWriter(DataWriter):
try:
if self.pending_normal_data is not None and
self.pending_normal_data.num_rows > 0:
self._close_current_writers()
+ if self._external_storage_writer:
+ self._external_storage_writer.close()
except Exception as e:
logger.error("Exception occurs when closing writer. Cleaning up.",
exc_info=e)
self.abort()
@@ -224,6 +246,8 @@ class DataBlobWriter(DataWriter):
"""Abort all writers and clean up resources."""
for blob_writer in self.blob_writers.values():
blob_writer.abort()
+ if self._external_storage_writer:
+ self._external_storage_writer.abort()
self.pending_normal_data = None
self.committed_files.clear()
diff --git
a/paimon-python/pypaimon/write/writer/external_storage_blob_writer.py
b/paimon-python/pypaimon/write/writer/external_storage_blob_writer.py
new file mode 100644
index 0000000000..380bca90cb
--- /dev/null
+++ b/paimon-python/pypaimon/write/writer/external_storage_blob_writer.py
@@ -0,0 +1,228 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import uuid
+from typing import Dict, Set
+
+import pyarrow as pa
+
+from pypaimon.table.row.blob import BlobDescriptor
+from pypaimon.write.blob_format_writer import BlobFormatWriter
+from pypaimon.write.writer.blob_file_writer import BlobFileWriter
+
+logger = logging.getLogger(__name__)
+
+# Size of the magic number written before each blob record in BlobFormatWriter
+_BLOB_MAGIC_SIZE = 4
+
+
+class ExternalStorageBlobWriter:
+ """
+ Writes raw BLOB data from external-storage fields to an external storage
path
+ and replaces the column values with serialized BlobDescriptor bytes.
+
+ This aligns with Java's ExternalStorageBlobWriter behavior:
+ - For each external-storage field, maintains an independent BlobFileWriter
+ - Writes blob data to external storage .blob files
+ - Replaces column values with BlobDescriptor(uri, offset, length)
+ - External files are NOT tracked in Paimon snapshot metadata
+ - Orphan file cleanup does NOT apply to the external storage path
+ - Path is flat: {externalStoragePath}/{prefix}{uuid}-{counter}.blob
+ - Rolling check happens after write (next write creates new file)
+ """
+
+ def __init__(
+ self,
+ file_io,
+ external_storage_path: str,
+ external_storage_fields: Set[str],
+ blob_target_file_size: int,
+ data_file_prefix: str = "data-",
+ ):
+ self._file_io = file_io
+ self._external_storage_path = external_storage_path.rstrip('/')
+ self._external_storage_fields = external_storage_fields
+ self._blob_target_file_size = blob_target_file_size
+ self._data_file_prefix = data_file_prefix
+ # Per-field BlobFileWriter instances
+ self._field_writers: Dict[str, BlobFileWriter] = {}
+ # Per-field flag: whether current writer has reached target size (roll
on next write)
+ self._field_needs_roll: Dict[str, bool] = {}
+ # Fixed UUID for this writer instance + incrementing path counter
(aligned with Java)
+ self._writer_uuid = str(uuid.uuid4())
+ self._path_count = 0
+ self._closed = False
+
+ def transform_batch(self, batch: pa.RecordBatch) -> pa.RecordBatch:
+ """
+ Transform a RecordBatch by writing external-storage field data to
+ external blob files and replacing column values with serialized
BlobDescriptor bytes.
+
+ Args:
+ batch: The input RecordBatch containing raw blob data in
external-storage columns.
+
+ Returns:
+ A new RecordBatch where external-storage columns are replaced with
+ serialized BlobDescriptor bytes.
+ """
+ if batch.num_rows == 0:
+ return batch
+
+ # Find which columns in this batch are external-storage fields
+ columns_to_transform = []
+ for field_name in self._external_storage_fields:
+ if field_name in batch.schema.names:
+ columns_to_transform.append(field_name)
+
+ if not columns_to_transform:
+ return batch
+
+ # Build new columns array
+ new_columns = []
+ for i, field in enumerate(batch.schema):
+ if field.name in columns_to_transform:
+ transformed_column = self._transform_column(field.name,
batch.column(i))
+ new_columns.append(transformed_column)
+ else:
+ new_columns.append(batch.column(i))
+
+ return pa.RecordBatch.from_arrays(new_columns, schema=batch.schema)
+
+ def _transform_column(self, field_name: str, column: pa.Array) -> pa.Array:
+ """Transform a single column: write blob data to external storage and
return descriptor bytes.
+
+ Handles both raw bytes and serialized BlobDescriptor bytes as input.
+ For BlobDescriptor input, BlobFileWriter will construct a BlobRef and
+ stream the original data from the source file (aligned with Java
behavior).
+ """
+ descriptor_values = []
+ values = column.to_pylist()
+
+ for value in values:
+ if value is None:
+ descriptor_values.append(None)
+ continue
+
+ # Write to external storage and get descriptor.
+ # BlobFileWriter._to_blob() handles both raw bytes and
BlobDescriptor bytes:
+ # - raw bytes → BlobData → direct write
+ # - BlobDescriptor bytes → BlobRef → stream read from source →
write
+ descriptor = self._write_to_external_storage(field_name, value)
+ descriptor_values.append(descriptor.serialize())
+
+ return pa.array(descriptor_values, type=column.type)
+
+ def _write_to_external_storage(self, field_name: str, value: bytes) ->
BlobDescriptor:
+ """Write blob data to external storage and return a BlobDescriptor.
+
+ Accepts both raw bytes and serialized BlobDescriptor bytes.
BlobFileWriter
+ handles the conversion internally (BlobDescriptor → BlobRef → stream
read).
+
+ In blob format, each record is:
+ [Magic (4B)] [Raw Data (variable)] [Length (8B)] [CRC32 (4B)]
+
+ The BlobDescriptor offset points to the raw data start (after magic),
+ and length is the actual raw data size. This aligns with Java's
+ ExternalStorageBlobWriter behavior.
+ """
+ writer = self._get_or_create_writer(field_name)
+
+ # The offset of raw data in the file = current position + magic size
+ raw_data_offset = writer.writer.position + _BLOB_MAGIC_SIZE
+
+ # Write using BlobFileWriter (standard blob format)
+ # BlobFileWriter._to_blob() handles both raw bytes and BlobDescriptor
bytes
+ single_row = pa.table({field_name: [value]})
+ writer.write_row(single_row)
+
+ # Calculate actual blob data length from position difference
+ # (works for both raw bytes and BlobDescriptor/BlobRef input)
+ blob_length = writer.writer.position - raw_data_offset -
BlobFormatWriter.METADATA_SIZE
+
+ # After write, check if target size reached (aligned with Java's
post-write check)
+ self._mark_roll_if_needed(field_name)
+
+ # Build the descriptor pointing to external storage
+ file_uri = str(writer.file_path)
+ return BlobDescriptor(file_uri, raw_data_offset, blob_length)
+
+ def _get_or_create_writer(self, field_name: str) -> BlobFileWriter:
+ """Get or create a BlobFileWriter for the given field.
+
+ Aligned with Java's rolling behavior: check happens after write.
+ If the previous write caused the writer to reach target size,
+ close and create a new one on the next call.
+ """
+ # Check if previous write triggered a roll
+ if self._field_needs_roll.get(field_name, False):
+ writer = self._field_writers.get(field_name)
+ if writer is not None and not writer.closed:
+ writer.close()
+ self._field_writers.pop(field_name, None)
+ self._field_needs_roll[field_name] = False
+
+ writer = self._field_writers.get(field_name)
+ if writer is None:
+ new_path = self._generate_external_blob_path()
+ writer = BlobFileWriter(self._file_io, new_path)
+ self._field_writers[field_name] = writer
+ logger.debug("Created new external storage blob file: %s for
field: %s", new_path, field_name)
+
+ return writer
+
+ def _mark_roll_if_needed(self, field_name: str):
+ """After writing, check if target size reached and mark for rolling on
next write."""
+ writer = self._field_writers.get(field_name)
+ if writer is not None and
writer.reach_target_size(self._blob_target_file_size):
+ self._field_needs_roll[field_name] = True
+
+ def _generate_external_blob_path(self) -> str:
+ """Generate a new external storage blob file path.
+
+ Aligned with Java's DataFilePathFactory.newExternalStorageBlobPath():
+ flat structure under externalStoragePath, filename =
{prefix}-{uuid}-{counter}.blob
+ """
+ file_name =
f"{self._data_file_prefix}{self._writer_uuid}-{self._path_count}.blob"
+ self._path_count += 1
+ return f"{self._external_storage_path}/{file_name}"
+
+ def close(self):
+ """Close all field writers."""
+ if self._closed:
+ return
+ for writer in self._field_writers.values():
+ if not writer.closed:
+ writer.close()
+ self._field_writers.clear()
+ self._closed = True
+
+ def abort(self):
+ """Abort all writers.
+
+ Aligned with Java's ExternalStorageBlobFieldWriter.abort():
+ - The current in-progress (not yet finalized) file is deleted via
writer.abort()
+ - Already finalized/closed external files remain on disk (orphan
cleanup
+ does not cover the external storage path)
+ """
+ if self._closed:
+ return
+ for writer in self._field_writers.values():
+ if not writer.closed:
+ writer.abort()
+ self._field_writers.clear()
+ self._closed = True