This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 52a362ce1a [python] Support blob-external-storage-path for writing 
BLOB data to external storage (#7967)
52a362ce1a is described below

commit 52a362ce1ab07c66fe0b30a0d6bee4912f476c47
Author: zhoulii <[email protected]>
AuthorDate: Tue May 26 18:12:05 2026 +0800

    [python] Support blob-external-storage-path for writing BLOB data to 
external storage (#7967)
---
 .../pypaimon/common/options/core_options.py        |  40 ++-
 paimon-python/pypaimon/schema/schema_manager.py    |  64 +++-
 .../pypaimon/tests/external_storage_blob_test.py   | 392 +++++++++++++++++++++
 .../pypaimon/write/writer/data_blob_writer.py      |  30 +-
 .../write/writer/external_storage_blob_writer.py   | 228 ++++++++++++
 5 files changed, 738 insertions(+), 16 deletions(-)

diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index 5bc7848e1e..b2f0e01916 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -16,15 +16,14 @@
 # under the License.
 
 import sys
+from datetime import timedelta
 from enum import Enum
 from typing import Dict, Optional
 
-from datetime import timedelta
-
 from pypaimon.common.memory_size import MemorySize
 from pypaimon.common.options import Options
-from pypaimon.common.options.config_options import ConfigOptions
 from pypaimon.common.options.config_option import ConfigOption
+from pypaimon.common.options.config_options import ConfigOptions
 
 
 class ExternalPathStrategy(str, Enum):
@@ -220,6 +219,28 @@ class CoreOptions:
         )
     )
 
+    BLOB_EXTERNAL_STORAGE_PATH: ConfigOption[str] = (
+        ConfigOptions.key("blob-external-storage-path")
+        .string_type()
+        .no_default_value()
+        .with_description(
+            "The external storage path where raw BLOB data from fields 
configured "
+            "by 'blob-external-storage-field' is written at write time. "
+            "Orphan file cleanup is not applied to this path."
+        )
+    )
+
+    BLOB_EXTERNAL_STORAGE_FIELD: ConfigOption[str] = (
+        ConfigOptions.key("blob-external-storage-field")
+        .string_type()
+        .no_default_value()
+        .with_description(
+            "Comma-separated BLOB field names (must be a subset of 
'blob-descriptor-field') "
+            "whose raw data will be written to external storage at write time. 
"
+            "The external storage path is configured via 
'blob-external-storage-path'."
+        )
+    )
+
     TARGET_FILE_SIZE: ConfigOption[MemorySize] = (
         ConfigOptions.key("target-file-size")
         .memory_type()
@@ -648,6 +669,19 @@ class CoreOptions:
             return {str(field).strip() for field in value if 
str(field).strip()}
         return set()
 
+    def blob_external_storage_fields(self, default=None):
+        value = self.options.get(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD, 
default)
+        if value is None:
+            return set()
+        if isinstance(value, str):
+            return {field.strip() for field in value.split(",") if 
field.strip()}
+        if isinstance(value, (list, set, tuple)):
+            return {str(field).strip() for field in value if 
str(field).strip()}
+        return set()
+
+    def blob_external_storage_path(self, default=None):
+        return self.options.get(CoreOptions.BLOB_EXTERNAL_STORAGE_PATH, 
default)
+
     def target_file_size(self, has_primary_key, default=None):
         return self.options.get(CoreOptions.TARGET_FILE_SIZE,
                                 MemorySize.of_mebi_bytes(
diff --git a/paimon-python/pypaimon/schema/schema_manager.py 
b/paimon-python/pypaimon/schema/schema_manager.py
index 3880bcd15c..1a01b31295 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -15,21 +15,22 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Optional, List
+from typing import List, Optional
 
-from pypaimon.common.identifier import DEFAULT_MAIN_BRANCH
-from pypaimon.catalog.catalog_exception import ColumnAlreadyExistException, 
ColumnNotExistException
+from pypaimon.catalog.catalog_exception import (ColumnAlreadyExistException,
+                                                ColumnNotExistException)
 from pypaimon.common.file_io import FileIO
+from pypaimon.common.identifier import DEFAULT_MAIN_BRANCH
 from pypaimon.common.json_util import JSON
-from pypaimon.common.options import Options, CoreOptions
+from pypaimon.common.options import CoreOptions, Options
 from pypaimon.schema.data_types import AtomicInteger, DataField
 from pypaimon.schema.schema import Schema
-from pypaimon.schema.schema_change import (
-    AddColumn, DropColumn, RemoveOption, RenameColumn,
-    SchemaChange, SetOption, UpdateColumnComment,
-    UpdateColumnNullability, UpdateColumnPosition,
-    UpdateColumnType, UpdateComment
-)
+from pypaimon.schema.schema_change import (AddColumn, DropColumn, RemoveOption,
+                                           RenameColumn, SchemaChange,
+                                           SetOption, UpdateColumnComment,
+                                           UpdateColumnNullability,
+                                           UpdateColumnPosition,
+                                           UpdateColumnType, UpdateComment)
 from pypaimon.schema.table_schema import TableSchema
 
 
@@ -148,6 +149,48 @@ def _assert_not_renaming_blob_column(
             )
 
 
+def _validate_blob_external_storage_fields(fields: List[DataField], options: 
dict):
+    """Validate blob-external-storage-field configuration.
+
+    Validation order aligned with Java's 
SchemaValidation.validateBlobExternalStorageFields():
+    1. Field must be a BLOB type in the schema
+    2. Field must be in blob-descriptor-field
+    3. blob-external-storage-path must be configured
+    """
+    core_options = CoreOptions(Options(options))
+    external_fields = core_options.blob_external_storage_fields()
+    if not external_fields:
+        return
+
+    # 1. Configured fields must be BLOB type
+    field_type_map = {f.name: f.type for f in fields}
+    for field_name in external_fields:
+        field_type = field_type_map.get(field_name)
+        if field_type is None or getattr(field_type, 'type', None) != 'BLOB':
+            raise ValueError(
+                f"Field '{field_name}' in "
+                f"'{CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()}' must be a 
BLOB type field."
+            )
+
+    # 2. Must be a subset of blob-descriptor-field
+    descriptor_fields = core_options.blob_descriptor_fields()
+    not_in_descriptor = external_fields - descriptor_fields
+    if not_in_descriptor:
+        raise ValueError(
+            f"Fields {sorted(not_in_descriptor)} in "
+            f"'{CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()}' must also be 
configured in "
+            f"'{CoreOptions.BLOB_DESCRIPTOR_FIELD.key()}'."
+        )
+
+    # 3. Must configure external-storage-path
+    external_path = core_options.blob_external_storage_path()
+    if not external_path:
+        raise ValueError(
+            f"'{CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()}' is configured 
but "
+            f"'{CoreOptions.BLOB_EXTERNAL_STORAGE_PATH.key()}' is not set."
+        )
+
+
 def _handle_rename_column(change: RenameColumn, new_fields: List[DataField]):
     field_name = change.field_names[-1]
     new_name = change.new_name
@@ -279,6 +322,7 @@ class SchemaManager:
             if latest is not None:
                 raise RuntimeError("Schema in filesystem exists, creation is 
not allowed.")
 
+            _validate_blob_external_storage_fields(schema.fields, 
schema.options)
             table_schema = TableSchema.from_schema(schema_id=0, schema=schema)
             success = self.commit(table_schema)
             if success:
diff --git a/paimon-python/pypaimon/tests/external_storage_blob_test.py 
b/paimon-python/pypaimon/tests/external_storage_blob_test.py
new file mode 100644
index 0000000000..505e4407a9
--- /dev/null
+++ b/paimon-python/pypaimon/tests/external_storage_blob_test.py
@@ -0,0 +1,392 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class ExternalStorageBlobValidationTest(unittest.TestCase):
+    """Tests for blob-external-storage-field schema validation."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.temp_dir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.temp_dir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('test_db', False)
+
+    @classmethod
+    def tearDownClass(cls):
+        try:
+            shutil.rmtree(cls.temp_dir)
+        except OSError:
+            pass
+
+    def test_validation_missing_path(self):
+        """blob-external-storage-field configured without path should raise 
error."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('video', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'blob-descriptor-field': 'video',
+            'blob-external-storage-field': 'video',
+            # Missing blob-external-storage-path
+        })
+        with self.assertRaises(ValueError) as ctx:
+            self.catalog.create_table('test_db.missing_path_test', schema, 
False)
+        self.assertIn('blob-external-storage-path', str(ctx.exception))
+
+    def test_validation_field_not_in_descriptor_field(self):
+        """blob-external-storage-field must be a subset of 
blob-descriptor-field."""
+        external_path = os.path.join(self.temp_dir, 'external_storage')
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('video', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            # NOT setting blob-descriptor-field
+            'blob-external-storage-field': 'video',
+            'blob-external-storage-path': external_path,
+        })
+        with self.assertRaises(ValueError) as ctx:
+            self.catalog.create_table('test_db.not_in_descriptor_test', 
schema, False)
+        self.assertIn('blob-descriptor-field', str(ctx.exception))
+
+    def test_validation_field_not_blob_type(self):
+        """blob-external-storage-field must reference BLOB type fields."""
+        external_path = os.path.join(self.temp_dir, 'external_storage')
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('video', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'blob-descriptor-field': 'name,video',
+            'blob-external-storage-field': 'name',
+            'blob-external-storage-path': external_path,
+        })
+        with self.assertRaises(ValueError) as ctx:
+            self.catalog.create_table('test_db.not_blob_type_test', schema, 
False)
+        self.assertIn('must be a BLOB type field', str(ctx.exception))
+
+    def test_validation_blob_not_null_field_passes(self):
+        """BLOB NOT NULL fields should pass validation (not be rejected by str 
comparison)."""
+        from pypaimon.schema.data_types import AtomicType, DataField
+
+        external_path = os.path.join(self.temp_dir, 'external_storage')
+        schema = Schema(
+            fields=[
+                DataField(0, 'id', AtomicType('INT', nullable=False)),
+                DataField(1, 'video', AtomicType('BLOB', nullable=False)),
+            ],
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob-descriptor-field': 'video',
+                'blob-external-storage-field': 'video',
+                'blob-external-storage-path': external_path,
+            },
+        )
+        # Should NOT raise - BLOB NOT NULL is still a BLOB type
+        self.catalog.create_table('test_db.blob_not_null_test', schema, False)
+        table = self.catalog.get_table('test_db.blob_not_null_test')
+        self.assertIsNotNone(table)
+
+
+class ExternalStorageBlobWriteTest(unittest.TestCase):
+    """Tests for blob external storage write functionality."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.temp_dir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.temp_dir, 'warehouse')
+        cls.external_path = os.path.join(cls.temp_dir, 'external_storage')
+        os.makedirs(cls.external_path, exist_ok=True)
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('test_db', False)
+
+    @classmethod
+    def tearDownClass(cls):
+        try:
+            shutil.rmtree(cls.temp_dir)
+        except OSError:
+            pass
+
+    def _create_external_storage_table(self, table_name, extra_options=None):
+        """Helper to create a table with external storage configured."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('video', pa.large_binary()),
+        ])
+        options = {
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'blob-descriptor-field': 'video',
+            'blob-external-storage-field': 'video',
+            'blob-external-storage-path': self.external_path,
+        }
+        if extra_options:
+            options.update(extra_options)
+        schema = Schema.from_pyarrow_schema(pa_schema, options=options)
+        self.catalog.create_table(f'test_db.{table_name}', schema, False)
+        return self.catalog.get_table(f'test_db.{table_name}')
+
+    def test_external_storage_basic_write(self):
+        """Basic write: raw blob data should be written to external storage as 
.blob files."""
+        table = self._create_external_storage_table('basic_write_test')
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('video', pa.large_binary()),
+        ])
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['a', 'b', 'c'],
+            'video': [b'video_data_1', b'video_data_2', b'video_data_3'],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        writer.close()
+
+        # Commit should succeed
+        self.assertGreater(len(commit_messages), 0)
+        write_builder.new_commit().commit(commit_messages)
+
+        # Verify external storage files were created
+        external_files = []
+        for root, dirs, files in os.walk(self.external_path):
+            for f in files:
+                if f.endswith('.blob'):
+                    external_files.append(os.path.join(root, f))
+        self.assertGreater(len(external_files), 0, "External blob files should 
be created")
+
+    def test_external_storage_roundtrip(self):
+        """Write raw blob data via external storage, read back should return 
original data."""
+        table = self._create_external_storage_table('roundtrip_test')
+
+        video_bytes = b'hello_world_video_content'
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('video', pa.large_binary()),
+        ])
+        test_data = pa.Table.from_pydict({
+            'id': [1],
+            'name': ['test'],
+            'video': [video_bytes],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        writer.close()
+        write_builder.new_commit().commit(commit_messages)
+
+        # Read back - reader resolves BlobDescriptor and returns original data
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+
+        self.assertEqual(result.num_rows, 1)
+        read_back = result.column('video')[0].as_py()
+        self.assertEqual(read_back, video_bytes)
+
+    def test_external_storage_multiple_fields(self):
+        """Multiple external storage fields should each write to separate blob 
files."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('video', pa.large_binary()),
+            ('audio', pa.large_binary()),
+        ])
+        options = {
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'blob-descriptor-field': 'video,audio',
+            'blob-external-storage-field': 'video,audio',
+            'blob-external-storage-path': self.external_path,
+        }
+        schema = Schema.from_pyarrow_schema(pa_schema, options=options)
+        self.catalog.create_table('test_db.multi_field_test', schema, False)
+        table = self.catalog.get_table('test_db.multi_field_test')
+
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2],
+            'video': [b'video1', b'video2'],
+            'audio': [b'audio1', b'audio2'],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        writer.close()
+        write_builder.new_commit().commit(commit_messages)
+
+        # Read back and verify data round-trips correctly
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+
+        self.assertEqual(result.num_rows, 2)
+        videos = result.column('video').to_pylist()
+        audios = result.column('audio').to_pylist()
+        self.assertEqual(set(videos), {b'video1', b'video2'})
+        self.assertEqual(set(audios), {b'audio1', b'audio2'})
+
+    def test_external_storage_mixed_with_normal_blob(self):
+        """External storage field + normal blob field should coexist."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('video', pa.large_binary()),      # external storage
+            ('thumbnail', pa.large_binary()),   # normal blob (written to 
.blob files)
+        ])
+        options = {
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'blob-descriptor-field': 'video',
+            'blob-external-storage-field': 'video',
+            'blob-external-storage-path': self.external_path,
+        }
+        schema = Schema.from_pyarrow_schema(pa_schema, options=options)
+        self.catalog.create_table('test_db.mixed_blob_test', schema, False)
+        table = self.catalog.get_table('test_db.mixed_blob_test')
+
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2],
+            'video': [b'big_video_data', b'another_video'],
+            'thumbnail': [b'thumb1', b'thumb2'],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        writer.close()
+        write_builder.new_commit().commit(commit_messages)
+
+        # Read back and verify both external storage and normal blob data
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+
+        self.assertEqual(result.num_rows, 2)
+        videos = set(result.column('video').to_pylist())
+        thumbnails = set(result.column('thumbnail').to_pylist())
+        self.assertEqual(videos, {b'big_video_data', b'another_video'})
+        self.assertEqual(thumbnails, {b'thumb1', b'thumb2'})
+
+    def test_external_storage_null_values(self):
+        """Null blob values should remain null (not written to external 
storage)."""
+        table = self._create_external_storage_table('null_test')
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('video', pa.large_binary()),
+        ])
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['a', 'b', 'c'],
+            'video': [b'data', None, b'more_data'],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        writer.close()
+        write_builder.new_commit().commit(commit_messages)
+
+        # Read back and verify nulls are preserved
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+
+        self.assertEqual(result.num_rows, 3)
+        # Build id → video mapping to avoid relying on row order
+        id_to_video = {
+            result.column('id')[i].as_py(): result.column('video')[i].as_py()
+            for i in range(result.num_rows)
+        }
+        self.assertEqual(id_to_video[1], b'data')
+        self.assertIsNone(id_to_video[2])
+        self.assertEqual(id_to_video[3], b'more_data')
+
+    def test_external_storage_with_descriptor_input(self):
+        """When input is serialized BlobDescriptor bytes, the writer should 
read
+        the source data via BlobRef and re-write it to external storage."""
+        from pypaimon.table.row.blob import BlobDescriptor
+
+        table = self._create_external_storage_table('descriptor_input_test')
+
+        # Create a source file with known raw content
+        source_data = b'original_video_from_descriptor'
+        source_file = os.path.join(self.external_path, 'source.bin')
+        with open(source_file, 'wb') as f:
+            f.write(source_data)
+
+        # Construct a BlobDescriptor pointing to the source file
+        descriptor = BlobDescriptor(source_file, 0, len(source_data))
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('video', pa.large_binary()),
+        ])
+        test_data = pa.Table.from_pydict({
+            'id': [1],
+            'name': ['desc_test'],
+            'video': [descriptor.serialize()],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        writer.close()
+        write_builder.new_commit().commit(commit_messages)
+
+        # Read back and verify the original data round-trips correctly
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+
+        self.assertEqual(result.num_rows, 1)
+        self.assertEqual(result.column('video')[0].as_py(), source_data)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py 
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index d04cfaf550..4c3289f5aa 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -19,11 +19,10 @@ import logging
 import uuid
 from typing import Dict, List, Optional, Tuple
 
-
 import pyarrow as pa
 
-from pypaimon.data.timestamp import Timestamp
 from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.data.timestamp import Timestamp
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.table.row.generic_row import GenericRow
@@ -144,12 +143,28 @@ class DataBlobWriter(DataWriter):
                 options=options
             )
 
+        # Initialize ExternalStorageBlobWriter if configured
+        self._external_storage_writer = None
+        external_storage_fields = self.options.blob_external_storage_fields()
+        external_storage_path = self.options.blob_external_storage_path()
+        if external_storage_fields and external_storage_path:
+            from pypaimon.write.writer.external_storage_blob_writer import \
+                ExternalStorageBlobWriter
+            self._external_storage_writer = ExternalStorageBlobWriter(
+                file_io=self.file_io,
+                external_storage_path=external_storage_path,
+                external_storage_fields=external_storage_fields,
+                blob_target_file_size=self.options.blob_target_file_size(),
+                data_file_prefix=CoreOptions.data_file_prefix(self.options),
+            )
+
         logger.info(
             "Initialized DataBlobWriter with blob columns: %s, blob file 
columns: %s, descriptor "
-            "stored columns: %s",
+            "stored columns: %s, external storage fields: %s",
             self.blob_column_names,
             self.blob_file_column_names,
             sorted(self.blob_descriptor_fields),
+            sorted(external_storage_fields) if external_storage_fields else [],
         )
 
     def _get_blob_columns_from_schema(self) -> List[str]:
@@ -172,6 +187,11 @@ class DataBlobWriter(DataWriter):
 
     def write(self, data: pa.RecordBatch):
         try:
+            # Transform external-storage fields: write raw blob to external 
storage,
+            # replace with serialized BlobDescriptor
+            if self._external_storage_writer:
+                data = self._external_storage_writer.transform_batch(data)
+
             # Split data into normal and blob parts
             normal_data, blob_data_map = self._split_data(data)
             self._validate_descriptor_stored_fields_input(data)
@@ -213,6 +233,8 @@ class DataBlobWriter(DataWriter):
         try:
             if self.pending_normal_data is not None and 
self.pending_normal_data.num_rows > 0:
                 self._close_current_writers()
+            if self._external_storage_writer:
+                self._external_storage_writer.close()
         except Exception as e:
             logger.error("Exception occurs when closing writer. Cleaning up.", 
exc_info=e)
             self.abort()
@@ -224,6 +246,8 @@ class DataBlobWriter(DataWriter):
         """Abort all writers and clean up resources."""
         for blob_writer in self.blob_writers.values():
             blob_writer.abort()
+        if self._external_storage_writer:
+            self._external_storage_writer.abort()
         self.pending_normal_data = None
         self.committed_files.clear()
 
diff --git 
a/paimon-python/pypaimon/write/writer/external_storage_blob_writer.py 
b/paimon-python/pypaimon/write/writer/external_storage_blob_writer.py
new file mode 100644
index 0000000000..380bca90cb
--- /dev/null
+++ b/paimon-python/pypaimon/write/writer/external_storage_blob_writer.py
@@ -0,0 +1,228 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import uuid
+from typing import Dict, Set
+
+import pyarrow as pa
+
+from pypaimon.table.row.blob import BlobDescriptor
+from pypaimon.write.blob_format_writer import BlobFormatWriter
+from pypaimon.write.writer.blob_file_writer import BlobFileWriter
+
+logger = logging.getLogger(__name__)
+
+# Size of the magic number written before each blob record in BlobFormatWriter
+_BLOB_MAGIC_SIZE = 4
+
+
+class ExternalStorageBlobWriter:
+    """
+    Writes raw BLOB data from external-storage fields to an external storage 
path
+    and replaces the column values with serialized BlobDescriptor bytes.
+
+    This aligns with Java's ExternalStorageBlobWriter behavior:
+    - For each external-storage field, maintains an independent BlobFileWriter
+    - Writes blob data to external storage .blob files
+    - Replaces column values with BlobDescriptor(uri, offset, length)
+    - External files are NOT tracked in Paimon snapshot metadata
+    - Orphan file cleanup does NOT apply to the external storage path
+    - Path is flat: {externalStoragePath}/{prefix}{uuid}-{counter}.blob
+    - Rolling check happens after write (next write creates new file)
+    """
+
+    def __init__(
+        self,
+        file_io,
+        external_storage_path: str,
+        external_storage_fields: Set[str],
+        blob_target_file_size: int,
+        data_file_prefix: str = "data-",
+    ):
+        self._file_io = file_io
+        self._external_storage_path = external_storage_path.rstrip('/')
+        self._external_storage_fields = external_storage_fields
+        self._blob_target_file_size = blob_target_file_size
+        self._data_file_prefix = data_file_prefix
+        # Per-field BlobFileWriter instances
+        self._field_writers: Dict[str, BlobFileWriter] = {}
+        # Per-field flag: whether current writer has reached target size (roll 
on next write)
+        self._field_needs_roll: Dict[str, bool] = {}
+        # Fixed UUID for this writer instance + incrementing path counter 
(aligned with Java)
+        self._writer_uuid = str(uuid.uuid4())
+        self._path_count = 0
+        self._closed = False
+
+    def transform_batch(self, batch: pa.RecordBatch) -> pa.RecordBatch:
+        """
+        Transform a RecordBatch by writing external-storage field data to
+        external blob files and replacing column values with serialized 
BlobDescriptor bytes.
+
+        Args:
+            batch: The input RecordBatch containing raw blob data in 
external-storage columns.
+
+        Returns:
+            A new RecordBatch where external-storage columns are replaced with
+            serialized BlobDescriptor bytes.
+        """
+        if batch.num_rows == 0:
+            return batch
+
+        # Find which columns in this batch are external-storage fields
+        columns_to_transform = []
+        for field_name in self._external_storage_fields:
+            if field_name in batch.schema.names:
+                columns_to_transform.append(field_name)
+
+        if not columns_to_transform:
+            return batch
+
+        # Build new columns array
+        new_columns = []
+        for i, field in enumerate(batch.schema):
+            if field.name in columns_to_transform:
+                transformed_column = self._transform_column(field.name, 
batch.column(i))
+                new_columns.append(transformed_column)
+            else:
+                new_columns.append(batch.column(i))
+
+        return pa.RecordBatch.from_arrays(new_columns, schema=batch.schema)
+
+    def _transform_column(self, field_name: str, column: pa.Array) -> pa.Array:
+        """Transform a single column: write blob data to external storage and 
return descriptor bytes.
+
+        Handles both raw bytes and serialized BlobDescriptor bytes as input.
+        For BlobDescriptor input, BlobFileWriter will construct a BlobRef and
+        stream the original data from the source file (aligned with Java 
behavior).
+        """
+        descriptor_values = []
+        values = column.to_pylist()
+
+        for value in values:
+            if value is None:
+                descriptor_values.append(None)
+                continue
+
+            # Write to external storage and get descriptor.
+            # BlobFileWriter._to_blob() handles both raw bytes and 
BlobDescriptor bytes:
+            # - raw bytes → BlobData → direct write
+            # - BlobDescriptor bytes → BlobRef → stream read from source → 
write
+            descriptor = self._write_to_external_storage(field_name, value)
+            descriptor_values.append(descriptor.serialize())
+
+        return pa.array(descriptor_values, type=column.type)
+
+    def _write_to_external_storage(self, field_name: str, value: bytes) -> 
BlobDescriptor:
+        """Write blob data to external storage and return a BlobDescriptor.
+
+        Accepts both raw bytes and serialized BlobDescriptor bytes. 
BlobFileWriter
+        handles the conversion internally (BlobDescriptor → BlobRef → stream 
read).
+
+        In blob format, each record is:
+            [Magic (4B)] [Raw Data (variable)] [Length (8B)] [CRC32 (4B)]
+
+        The BlobDescriptor offset points to the raw data start (after magic),
+        and length is the actual raw data size. This aligns with Java's
+        ExternalStorageBlobWriter behavior.
+        """
+        writer = self._get_or_create_writer(field_name)
+
+        # The offset of raw data in the file = current position + magic size
+        raw_data_offset = writer.writer.position + _BLOB_MAGIC_SIZE
+
+        # Write using BlobFileWriter (standard blob format)
+        # BlobFileWriter._to_blob() handles both raw bytes and BlobDescriptor 
bytes
+        single_row = pa.table({field_name: [value]})
+        writer.write_row(single_row)
+
+        # Calculate actual blob data length from position difference
+        # (works for both raw bytes and BlobDescriptor/BlobRef input)
+        blob_length = writer.writer.position - raw_data_offset - 
BlobFormatWriter.METADATA_SIZE
+
+        # After write, check if target size reached (aligned with Java's 
post-write check)
+        self._mark_roll_if_needed(field_name)
+
+        # Build the descriptor pointing to external storage
+        file_uri = str(writer.file_path)
+        return BlobDescriptor(file_uri, raw_data_offset, blob_length)
+
+    def _get_or_create_writer(self, field_name: str) -> BlobFileWriter:
+        """Get or create a BlobFileWriter for the given field.
+
+        Aligned with Java's rolling behavior: check happens after write.
+        If the previous write caused the writer to reach target size,
+        close and create a new one on the next call.
+        """
+        # Check if previous write triggered a roll
+        if self._field_needs_roll.get(field_name, False):
+            writer = self._field_writers.get(field_name)
+            if writer is not None and not writer.closed:
+                writer.close()
+            self._field_writers.pop(field_name, None)
+            self._field_needs_roll[field_name] = False
+
+        writer = self._field_writers.get(field_name)
+        if writer is None:
+            new_path = self._generate_external_blob_path()
+            writer = BlobFileWriter(self._file_io, new_path)
+            self._field_writers[field_name] = writer
+            logger.debug("Created new external storage blob file: %s for 
field: %s", new_path, field_name)
+
+        return writer
+
+    def _mark_roll_if_needed(self, field_name: str):
+        """After writing, check if target size reached and mark for rolling on 
next write."""
+        writer = self._field_writers.get(field_name)
+        if writer is not None and 
writer.reach_target_size(self._blob_target_file_size):
+            self._field_needs_roll[field_name] = True
+
+    def _generate_external_blob_path(self) -> str:
+        """Generate a new external storage blob file path.
+
+        Aligned with Java's DataFilePathFactory.newExternalStorageBlobPath():
+        flat structure under externalStoragePath, filename = 
{prefix}-{uuid}-{counter}.blob
+        """
+        file_name = 
f"{self._data_file_prefix}{self._writer_uuid}-{self._path_count}.blob"
+        self._path_count += 1
+        return f"{self._external_storage_path}/{file_name}"
+
+    def close(self):
+        """Close all field writers."""
+        if self._closed:
+            return
+        for writer in self._field_writers.values():
+            if not writer.closed:
+                writer.close()
+        self._field_writers.clear()
+        self._closed = True
+
+    def abort(self):
+        """Abort all writers.
+
+        Aligned with Java's ExternalStorageBlobFieldWriter.abort():
+        - The current in-progress (not yet finalized) file is deleted via 
writer.abort()
+        - Already finalized/closed external files remain on disk (orphan 
cleanup
+          does not cover the external storage path)
+        """
+        if self._closed:
+            return
+        for writer in self._field_writers.values():
+            if not writer.closed:
+                writer.abort()
+        self._field_writers.clear()
+        self._closed = True


Reply via email to