This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 462c4203ac309a336e67fb1ae58232dca303a868
Author: YeJunHao <[email protected]>
AuthorDate: Fri Oct 17 13:33:45 2025 +0800

    [python] Support blob read && write (#6420)
---
 paimon-python/pypaimon/common/core_options.py      |    2 +
 paimon-python/pypaimon/common/file_io.py           |    1 -
 .../pypaimon/read/reader/concat_batch_reader.py    |   68 ++
 .../pypaimon/read/scanner/full_starting_scanner.py |  117 +-
 paimon-python/pypaimon/read/split_read.py          |   42 +-
 paimon-python/pypaimon/read/table_read.py          |    3 +-
 paimon-python/pypaimon/schema/data_types.py        |    1 +
 paimon-python/pypaimon/schema/schema.py            |   36 +-
 paimon-python/pypaimon/tests/blob_table_test.py    | 1177 ++++++++++++++++++++
 paimon-python/pypaimon/write/file_store_commit.py  |    3 +-
 paimon-python/pypaimon/write/file_store_write.py   |   22 +-
 .../writer/blob_writer.py}                         |   61 +-
 .../pypaimon/write/writer/data_blob_writer.py      |  321 ++++++
 paimon-python/pypaimon/write/writer/data_writer.py |   49 +-
 14 files changed, 1844 insertions(+), 59 deletions(-)

diff --git a/paimon-python/pypaimon/common/core_options.py 
b/paimon-python/pypaimon/common/core_options.py
index 8ab26fe062..da1ad0674e 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -49,3 +49,5 @@ class CoreOptions(str, Enum):
     INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
     # Commit options
     COMMIT_USER_PREFIX = "commit.user-prefix"
+    ROW_TRACKING_ENABLED = "row-tracking.enabled"
+    DATA_EVOLUTION_ENABLED = "data-evolution.enabled"
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index eb32ebb755..f881ba77bc 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -25,7 +25,6 @@ from urllib.parse import splitport, urlparse
 import pyarrow
 from packaging.version import parse
 from pyarrow._fs import FileSystem
-
 from pypaimon.common.config import OssOptions, S3Options
 from pypaimon.common.uri_reader import UriReaderFactory
 from pypaimon.schema.data_types import DataField, AtomicType, 
PyarrowFieldParser
diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py 
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index a5a596e1ea..de4f10c15b 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -19,6 +19,7 @@
 import collections
 from typing import Callable, List, Optional
 
+import pyarrow as pa
 from pyarrow import RecordBatch
 
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
@@ -76,3 +77,70 @@ class ShardBatchReader(ConcatBatchReader):
                 return batch.slice(0, self.split_end_row - cur_begin)
         else:
             return batch
+
+
+class MergeAllBatchReader(RecordBatchReader):
+    """
+    A reader that accepts multiple reader suppliers and concatenates all their 
arrow batches
+    into one big batch. This is useful when you want to merge all data from 
multiple sources
+    into a single batch for processing.
+    """
+
+    def __init__(self, reader_suppliers: List[Callable]):
+        self.reader_suppliers = reader_suppliers
+        self.merged_batch: Optional[RecordBatch] = None
+        self.batch_created = False
+
+    def read_arrow_batch(self) -> Optional[RecordBatch]:
+        if self.batch_created:
+            return None
+
+        all_batches = []
+
+        # Read all batches from all reader suppliers
+        for supplier in self.reader_suppliers:
+            reader = supplier()
+            try:
+                while True:
+                    batch = reader.read_arrow_batch()
+                    if batch is None:
+                        break
+                    all_batches.append(batch)
+            finally:
+                reader.close()
+
+        # Concatenate all batches into one big batch
+        if all_batches:
+            # For PyArrow < 17.0.0, use Table.concat_tables approach
+            # Convert batches to tables and concatenate
+            tables = [pa.Table.from_batches([batch]) for batch in all_batches]
+            if len(tables) == 1:
+                # Single table, just get the first batch
+                self.merged_batch = tables[0].to_batches()[0]
+            else:
+                # Multiple tables, concatenate them
+                concatenated_table = pa.concat_tables(tables)
+                # Convert back to a single batch by taking all batches and 
combining
+                all_concatenated_batches = concatenated_table.to_batches()
+                if len(all_concatenated_batches) == 1:
+                    self.merged_batch = all_concatenated_batches[0]
+                else:
+                    # If still multiple batches, we need to manually combine 
them
+                    # This shouldn't happen with concat_tables, but just in 
case
+                    combined_arrays = []
+                    for i in range(len(all_concatenated_batches[0].columns)):
+                        column_arrays = [batch.column(i) for batch in 
all_concatenated_batches]
+                        combined_arrays.append(pa.concat_arrays(column_arrays))
+                    self.merged_batch = pa.RecordBatch.from_arrays(
+                        combined_arrays,
+                        names=all_concatenated_batches[0].schema.names
+                    )
+        else:
+            self.merged_batch = None
+
+        self.batch_created = True
+        return self.merged_batch
+
+    def close(self) -> None:
+        self.merged_batch = None
+        self.batch_created = False
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 4275b3f3e2..73915a0412 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -18,6 +18,7 @@ limitations under the License.
 from collections import defaultdict
 from typing import Callable, List, Optional
 
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
 from pypaimon.common.predicate_builder import PredicateBuilder
 from pypaimon.manifest.manifest_file_manager import ManifestFileManager
@@ -65,6 +66,7 @@ class FullStartingScanner(StartingScanner):
 
         self.only_read_real_buckets = True if int(
             self.table.options.get('bucket', -1)) == 
BucketMode.POSTPONE_BUCKET.value else False
+        self.data_evolution = 
self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() == 
'true'
 
     def scan(self) -> Plan:
         file_entries = self.plan_files()
@@ -72,6 +74,8 @@ class FullStartingScanner(StartingScanner):
             return Plan([])
         if self.table.is_primary_key_table:
             splits = self._create_primary_key_splits(file_entries)
+        elif self.data_evolution:
+            splits = self._create_data_evolution_splits(file_entries)
         else:
             splits = self._create_append_only_splits(file_entries)
 
@@ -104,7 +108,7 @@ class FullStartingScanner(StartingScanner):
             file_entries = self._filter_by_predicate(file_entries)
         return file_entries
 
-    def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 
'TableScan':
+    def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 
'FullStartingScanner':
         if idx_of_this_subtask >= number_of_para_subtasks:
             raise Exception("idx_of_this_subtask must be less than 
number_of_para_subtasks")
         self.idx_of_this_subtask = idx_of_this_subtask
@@ -357,3 +361,114 @@ class FullStartingScanner(StartingScanner):
             packed.append(bin_items)
 
         return packed
+
+    def _create_data_evolution_splits(self, file_entries: List[ManifestEntry]) 
-> List['Split']:
+        partitioned_files = defaultdict(list)
+        for entry in file_entries:
+            partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
+
+        if self.idx_of_this_subtask is not None:
+            partitioned_files, plan_start_row, plan_end_row = 
self._append_only_filter_by_shard(partitioned_files)
+
+        def weight_func(file_list: List[DataFileMeta]) -> int:
+            return max(sum(f.file_size for f in file_list), 
self.open_file_cost)
+
+        splits = []
+        for key, file_entries in partitioned_files.items():
+            if not file_entries:
+                continue
+
+            data_files: List[DataFileMeta] = [e.file for e in file_entries]
+
+            # Split files by firstRowId for data evolution
+            split_by_row_id = self._split_by_row_id(data_files)
+
+            # Pack the split groups for optimal split sizes
+            packed_files: List[List[List[DataFileMeta]]] = 
self._pack_for_ordered(split_by_row_id, weight_func,
+                                                                               
   self.target_split_size)
+
+            # Flatten the packed files and build splits
+            flatten_packed_files: List[List[DataFileMeta]] = [
+                [file for sub_pack in pack for file in sub_pack]
+                for pack in packed_files
+            ]
+
+            splits += self._build_split_from_pack(flatten_packed_files, 
file_entries, False)
+
+        if self.idx_of_this_subtask is not None:
+            self._compute_split_start_end_row(splits, plan_start_row, 
plan_end_row)
+        return splits
+
+    def _split_by_row_id(self, files: List[DataFileMeta]) -> 
List[List[DataFileMeta]]:
+        split_by_row_id = []
+
+        def sort_key(file: DataFileMeta) -> tuple:
+            first_row_id = file.first_row_id if file.first_row_id is not None 
else float('-inf')
+            is_blob = 1 if self._is_blob_file(file.file_name) else 0
+            # For files with same firstRowId, sort by maxSequenceNumber in 
descending order
+            # (larger sequence number means more recent data)
+            max_seq = file.max_sequence_number
+            return (first_row_id, is_blob, -max_seq)
+
+        sorted_files = sorted(files, key=sort_key)
+
+        # Filter blob files to only include those within the row ID range of 
non-blob files
+        sorted_files = self._filter_blob(sorted_files)
+
+        # Split files by firstRowId
+        last_row_id = -1
+        check_row_id_start = 0
+        current_split = []
+
+        for file in sorted_files:
+            first_row_id = file.first_row_id
+            if first_row_id is None:
+                # Files without firstRowId are treated as individual splits
+                split_by_row_id.append([file])
+                continue
+
+            if not self._is_blob_file(file.file_name) and first_row_id != 
last_row_id:
+                if current_split:
+                    split_by_row_id.append(current_split)
+
+                # Validate that files don't overlap
+                if first_row_id < check_row_id_start:
+                    file_names = [f.file_name for f in sorted_files]
+                    raise ValueError(
+                        f"There are overlapping files in the split: 
{file_names}, "
+                        f"the wrong file is: {file.file_name}"
+                    )
+
+                current_split = []
+                last_row_id = first_row_id
+                check_row_id_start = first_row_id + file.row_count
+
+            current_split.append(file)
+
+        if current_split:
+            split_by_row_id.append(current_split)
+
+        return split_by_row_id
+
+    @staticmethod
+    def _is_blob_file(file_name: str) -> bool:
+        return file_name.endswith('.blob')
+
+    @staticmethod
+    def _filter_blob(files: List[DataFileMeta]) -> List[DataFileMeta]:
+        result = []
+        row_id_start = -1
+        row_id_end = -1
+
+        for file in files:
+            if not FullStartingScanner._is_blob_file(file.file_name):
+                if file.first_row_id is not None:
+                    row_id_start = file.first_row_id
+                    row_id_end = file.first_row_id + file.row_count
+                result.append(file)
+            else:
+                if file.first_row_id is not None and row_id_start != -1:
+                    if row_id_start <= file.first_row_id < row_id_end:
+                        result.append(file)
+
+        return result
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 372679ed63..64a28ac63c 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -26,7 +26,7 @@ from pypaimon.common.predicate import Predicate
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.read.interval_partition import IntervalPartition, SortedRun
 from pypaimon.read.partition_info import PartitionInfo
-from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader, 
ShardBatchReader
+from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader, 
ShardBatchReader, MergeAllBatchReader
 from pypaimon.read.reader.concat_record_reader import ConcatRecordReader
 from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
 from pypaimon.read.reader.data_evolution_merge_reader import 
DataEvolutionMergeReader
@@ -73,21 +73,21 @@ class SplitRead(ABC):
     def create_reader(self) -> RecordReader:
         """Create a record reader for the given split."""
 
-    def file_reader_supplier(self, file_path: str, for_merge_read: bool):
+    def file_reader_supplier(self, file_path: str, for_merge_read: bool, 
read_fields: List[str]):
         _, extension = os.path.splitext(file_path)
         file_format = extension[1:]
 
         format_reader: RecordBatchReader
         if file_format == CoreOptions.FILE_FORMAT_AVRO:
-            format_reader = FormatAvroReader(self.table.file_io, file_path, 
self._get_final_read_data_fields(),
+            format_reader = FormatAvroReader(self.table.file_io, file_path, 
read_fields,
                                              self.read_fields, 
self.push_down_predicate)
         elif file_format == CoreOptions.FILE_FORMAT_BLOB:
             blob_as_descriptor = 
self.table.options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
-            format_reader = FormatBlobReader(self.table.file_io, file_path, 
self._get_final_read_data_fields(),
+            format_reader = FormatBlobReader(self.table.file_io, file_path, 
read_fields,
                                              self.read_fields, 
self.push_down_predicate, blob_as_descriptor)
         elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == 
CoreOptions.FILE_FORMAT_ORC:
             format_reader = FormatPyArrowReader(self.table.file_io, 
file_format, file_path,
-                                                
self._get_final_read_data_fields(), self.push_down_predicate)
+                                                read_fields, 
self.push_down_predicate)
         else:
             raise ValueError(f"Unexpected file format: {file_format}")
 
@@ -253,7 +253,12 @@ class RawFileSplitRead(SplitRead):
     def create_reader(self) -> RecordReader:
         data_readers = []
         for file_path in self.split.file_paths:
-            supplier = partial(self.file_reader_supplier, file_path=file_path, 
for_merge_read=False)
+            supplier = partial(
+                self.file_reader_supplier,
+                file_path=file_path,
+                for_merge_read=False,
+                read_fields=self._get_final_read_data_fields(),
+            )
             data_readers.append(supplier)
 
         if not data_readers:
@@ -274,7 +279,12 @@ class RawFileSplitRead(SplitRead):
 
 class MergeFileSplitRead(SplitRead):
     def kv_reader_supplier(self, file_path):
-        reader_supplier = partial(self.file_reader_supplier, 
file_path=file_path, for_merge_read=True)
+        reader_supplier = partial(
+            self.file_reader_supplier,
+            file_path=file_path,
+            for_merge_read=True,
+            read_fields=self._get_final_read_data_fields()
+        )
         return KeyValueWrapReader(reader_supplier(), 
len(self.trimmed_primary_key), self.value_arity)
 
     def section_reader_supplier(self, section: List[SortedRun]):
@@ -317,7 +327,7 @@ class DataEvolutionSplitRead(SplitRead):
             if len(need_merge_files) == 1 or not self.read_fields:
                 # No need to merge fields, just create a single file reader
                 suppliers.append(
-                    lambda f=need_merge_files[0]: self._create_file_reader(f)
+                    lambda f=need_merge_files[0]: self._create_file_reader(f, 
self._get_final_read_data_fields())
                 )
             else:
                 suppliers.append(
@@ -424,26 +434,30 @@ class DataEvolutionSplitRead(SplitRead):
                 self.read_fields = read_fields  # create reader based on 
read_fields
                 # Create reader for this bunch
                 if len(bunch.files()) == 1:
-                    file_record_readers[i] = 
self._create_file_reader(bunch.files()[0])
+                    file_record_readers[i] = self._create_file_reader(
+                        bunch.files()[0], [field.name for field in read_fields]
+                    )
                 else:
                     # Create concatenated reader for multiple files
                     suppliers = [
-                        lambda f=file: self._create_file_reader(f) for file in 
bunch.files()
+                        lambda f=file: self._create_file_reader(
+                            f, [field.name for field in read_fields]
+                        ) for file in bunch.files()
                     ]
-                    file_record_readers[i] = ConcatRecordReader(suppliers)
+                    file_record_readers[i] = MergeAllBatchReader(suppliers)
                 self.read_fields = table_fields
 
         # Validate that all required fields are found
         for i, field in enumerate(all_read_fields):
             if row_offsets[i] == -1:
-                if not field.type.is_nullable():
+                if not field.type.nullable:
                     raise ValueError(f"Field {field} is not null but can't 
find any file contains it.")
 
         return DataEvolutionMergeReader(row_offsets, field_offsets, 
file_record_readers)
 
-    def _create_file_reader(self, file: DataFileMeta) -> RecordReader:
+    def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> 
RecordReader:
         """Create a file reader for a single file."""
-        return self.file_reader_supplier(file_path=file.file_path, 
for_merge_read=False)
+        return self.file_reader_supplier(file_path=file.file_path, 
for_merge_read=False, read_fields=read_fields)
 
     def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> 
List[FieldBunch]:
         """Split files into field bunches."""
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index cf32d04d72..4c2c615d89 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -20,6 +20,7 @@ from typing import Any, Iterator, List, Optional
 import pandas
 import pyarrow
 
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
 from pypaimon.common.predicate_builder import PredicateBuilder
 from pypaimon.read.push_down_utils import extract_predicate_to_list
@@ -132,7 +133,7 @@ class TableRead:
                 read_type=self.read_type,
                 split=split
             )
-        elif self.table.options.get('data-evolution.enabled', 'false').lower() 
== 'true':
+        elif self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 
'false').lower() == 'true':
             return DataEvolutionSplitRead(
                 table=self.table,
                 predicate=self.predicate,
diff --git a/paimon-python/pypaimon/schema/data_types.py 
b/paimon-python/pypaimon/schema/data_types.py
index d1ce2354a8..c65771d7f2 100644
--- a/paimon-python/pypaimon/schema/data_types.py
+++ b/paimon-python/pypaimon/schema/data_types.py
@@ -548,6 +548,7 @@ class PyarrowFieldParser:
 
     @staticmethod
     def to_paimon_schema(pa_schema: pyarrow.Schema) -> List[DataField]:
+        # Convert PyArrow schema to Paimon fields
         fields = []
         for i, pa_field in enumerate(pa_schema):
             pa_field: pyarrow.Field
diff --git a/paimon-python/pypaimon/schema/schema.py 
b/paimon-python/pypaimon/schema/schema.py
index 965fe2255b..0ad53f99d3 100644
--- a/paimon-python/pypaimon/schema/schema.py
+++ b/paimon-python/pypaimon/schema/schema.py
@@ -20,6 +20,7 @@ from typing import Dict, List, Optional
 
 import pyarrow as pa
 
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.json_util import json_field
 from pypaimon.schema.data_types import DataField, PyarrowFieldParser
 
@@ -51,4 +52,37 @@ class Schema:
     def from_pyarrow_schema(pa_schema: pa.Schema, partition_keys: 
Optional[List[str]] = None,
                             primary_keys: Optional[List[str]] = None, options: 
Optional[Dict] = None,
                             comment: Optional[str] = None):
-        return Schema(PyarrowFieldParser.to_paimon_schema(pa_schema), 
partition_keys, primary_keys, options, comment)
+        # Convert PyArrow schema to Paimon fields
+        fields = PyarrowFieldParser.to_paimon_schema(pa_schema)
+
+        # Check if Blob type exists in the schema
+        has_blob_type = any(
+            'blob' in str(field.type).lower()
+            for field in fields
+        )
+
+        # If Blob type exists, validate required options
+        if has_blob_type:
+            if options is None:
+                options = {}
+
+            required_options = {
+                CoreOptions.ROW_TRACKING_ENABLED: 'true',
+                CoreOptions.DATA_EVOLUTION_ENABLED: 'true'
+            }
+
+            missing_options = []
+            for key, expected_value in required_options.items():
+                if key not in options or options[key] != expected_value:
+                    missing_options.append(f"{key}='{expected_value}'")
+
+            if missing_options:
+                raise ValueError(
+                    f"Schema contains Blob type but is missing required 
options: {', '.join(missing_options)}. "
+                    f"Please add these options to the schema."
+                )
+
+            if primary_keys is not None:
+                raise ValueError("Blob type is not supported with primary 
key.")
+
+        return Schema(fields, partition_keys, primary_keys, options, comment)
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
new file mode 100644
index 0000000000..3c8273bde4
--- /dev/null
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -0,0 +1,1177 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory
+from pypaimon.write.commit_message import CommitMessage
+
+
+class DataBlobWriterTest(unittest.TestCase):
+    """Tests for DataBlobWriter functionality with paimon table operations."""
+
+    @classmethod
+    def setUpClass(cls):
+        """Set up test environment."""
+        cls.temp_dir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.temp_dir, 'warehouse')
+        # Create catalog for table operations
+        cls.catalog = CatalogFactory.create({
+            'warehouse': cls.warehouse
+        })
+        cls.catalog.create_database('test_db', False)
+
+    @classmethod
+    def tearDownClass(cls):
+        """Clean up test environment."""
+        try:
+            shutil.rmtree(cls.temp_dir)
+        except OSError:
+            pass
+
+    def test_data_blob_writer_basic_functionality(self):
+        """Test basic DataBlobWriter functionality with paimon table."""
+        from pypaimon import Schema
+
+        # Create schema with normal and blob columns
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('blob_data', pa.large_binary()),  # This will be detected as blob
+        ])
+
+        # Create Paimon schema
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+
+        # Create table
+        self.catalog.create_table('test_db.blob_writer_test', schema, False)
+        table = self.catalog.get_table('test_db.blob_writer_test')
+
+        # Test data
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['Alice', 'Bob', 'Charlie'],
+            'blob_data': [b'blob_data_1', b'blob_data_2', b'blob_data_3']
+        }, schema=pa_schema)
+
+        # Test DataBlobWriter initialization using proper table API
+        # Use proper table API to create writer
+        write_builder = table.new_batch_write_builder()
+        blob_writer = write_builder.new_write()
+
+        # Write test data using BatchTableWrite API
+        blob_writer.write_arrow(test_data)
+
+        # Test prepare commit
+        commit_messages = blob_writer.prepare_commit()
+        self.assertIsInstance(commit_messages, list)
+        self.assertGreater(len(commit_messages), 0)
+
+        # Verify commit message structure
+        for commit_msg in commit_messages:
+            self.assertIsInstance(commit_msg.new_files, list)
+            self.assertGreater(len(commit_msg.new_files), 0)
+
+            # Verify file metadata structure
+            for file_meta in commit_msg.new_files:
+                self.assertIsNotNone(file_meta.file_name)
+                self.assertGreater(file_meta.file_size, 0)
+                self.assertGreater(file_meta.row_count, 0)
+
+        blob_writer.close()
+
+    def test_data_blob_writer_schema_detection(self):
+        """Test that DataBlobWriter correctly detects blob columns from 
schema."""
+        from pypaimon import Schema
+
+        # Test schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('blob_field', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.blob_detection_test', schema, False)
+        table = self.catalog.get_table('test_db.blob_detection_test')
+
+        # Use proper table API to create writer
+        write_builder = table.new_batch_write_builder()
+        blob_writer = write_builder.new_write()
+
+        # Test that DataBlobWriter was created internally
+        # We can verify this by checking the internal data writers
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'blob_field': [b'blob1', b'blob2', b'blob3']
+        }, schema=pa_schema)
+
+        # Write data to trigger writer creation
+        blob_writer.write_arrow(test_data)
+
+        # Verify that a DataBlobWriter was created internally
+        data_writers = blob_writer.file_store_write.data_writers
+        self.assertGreater(len(data_writers), 0)
+
+        # Check that the writer is a DataBlobWriter
+        for writer in data_writers.values():
+            from pypaimon.write.writer.data_blob_writer import DataBlobWriter
+            self.assertIsInstance(writer, DataBlobWriter)
+
+        blob_writer.close()
+
+    def test_data_blob_writer_no_blob_column(self):
+        """Test that DataBlobWriter raises error when no blob column is 
found."""
+        from pypaimon import Schema
+
+        # Test schema without blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.no_blob_test', schema, False)
+        table = self.catalog.get_table('test_db.no_blob_test')
+
+        # Use proper table API to create writer
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+
+        # Test that a regular writer (not DataBlobWriter) was created
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['Alice', 'Bob', 'Charlie']
+        }, schema=pa_schema)
+
+        # Write data to trigger writer creation
+        writer.write_arrow(test_data)
+
+        # Verify that a regular writer was created (not DataBlobWriter)
+        data_writers = writer.file_store_write.data_writers
+        self.assertGreater(len(data_writers), 0)
+
+        # Check that the writer is NOT a DataBlobWriter
+        for writer_instance in data_writers.values():
+            from pypaimon.write.writer.data_blob_writer import DataBlobWriter
+            self.assertNotIsInstance(writer_instance, DataBlobWriter)
+
+        writer.close()
+
+    def test_data_blob_writer_multiple_blob_columns(self):
+        """Test that DataBlobWriter raises error when multiple blob columns 
are found."""
+        from pypaimon import Schema
+
+        # Test schema with multiple blob columns
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('blob1', pa.large_binary()),
+            ('blob2', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.multiple_blob_test', schema, False)
+        table = self.catalog.get_table('test_db.multiple_blob_test')
+
+        # Use proper table API to create writer
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+
+        # Test data with multiple blob columns
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'blob1': [b'blob1_1', b'blob1_2', b'blob1_3'],
+            'blob2': [b'blob2_1', b'blob2_2', b'blob2_3']
+        }, schema=pa_schema)
+
+        # This should raise an error when DataBlobWriter is created internally
+        with self.assertRaises(ValueError) as context:
+            writer.write_arrow(test_data)
+        self.assertIn("Limit exactly one blob field in one paimon table yet", 
str(context.exception))
+
+    def test_data_blob_writer_write_operations(self):
+        """Test DataBlobWriter write operations with real data."""
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('document', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.write_test', schema, False)
+        table = self.catalog.get_table('test_db.write_test')
+
+        # Use proper table API to create writer
+        write_builder = table.new_batch_write_builder()
+        blob_writer = write_builder.new_write()
+
+        # Test data
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2],
+            'name': ['Alice', 'Bob'],
+            'document': [b'document_content_1', b'document_content_2']
+        }, schema=pa_schema)
+
+        # Test writing data
+        for batch in test_data.to_batches():
+            blob_writer.write_arrow_batch(batch)
+
+        # Test prepare commit
+        commit_messages = blob_writer.prepare_commit()
+        self.assertIsInstance(commit_messages, list)
+
+        blob_writer.close()
+
+    def test_data_blob_writer_write_large_blob(self):
+        """Test DataBlobWriter with very large blob data (50MB per item) in 10 
batches."""
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('description', pa.string()),
+            ('large_blob', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.large_blob_test', schema, False)
+        table = self.catalog.get_table('test_db.large_blob_test')
+
+        # Use proper table API to create writer
+        write_builder = table.new_batch_write_builder()
+        blob_writer = write_builder.new_write()
+
+        # Create 50MB blob data per item
+        # Using a pattern to make the data more realistic and compressible
+        target_size = 50 * 1024 * 1024  # 50MB in bytes
+        blob_pattern = b'LARGE_BLOB_DATA_PATTERN_' + b'X' * 1024  # ~1KB 
pattern
+        pattern_size = len(blob_pattern)
+        repetitions = target_size // pattern_size
+        large_blob_data = blob_pattern * repetitions
+
+        # Verify the blob size is approximately 50MB
+        blob_size_mb = len(large_blob_data) / (1024 * 1024)
+        self.assertGreater(blob_size_mb, 49)  # Should be at least 49MB
+        self.assertLess(blob_size_mb, 51)     # Should be less than 51MB
+
+        total_rows = 0
+
+        # Write 10 batches, each with 5 rows (50 rows total)
+        # Total data volume: 50 rows * 50MB = 2.5GB of blob data
+        for batch_num in range(10):
+            batch_data = pa.Table.from_pydict({
+                'id': [batch_num * 5 + i for i in range(5)],
+                'description': [f'Large blob batch {batch_num}, row {i}' for i 
in range(5)],
+                'large_blob': [large_blob_data] * 5  # 5 rows per batch, each 
with 50MB blob
+            }, schema=pa_schema)
+
+            # Write each batch
+            for batch in batch_data.to_batches():
+                blob_writer.write_arrow_batch(batch)
+                total_rows += batch.num_rows
+
+            # Log progress for large data processing
+            print(f"Completed batch {batch_num + 1}/10 with {batch.num_rows} 
rows")
+
+        # Record count is tracked internally by DataBlobWriter
+
+        # Test prepare commit
+        commit_messages: CommitMessage = blob_writer.prepare_commit()
+        self.assertIsInstance(commit_messages, list)
+        # Verify we have commit messages
+        self.assertEqual(len(commit_messages), 1)
+        commit_message = commit_messages[0]
+        normal_file_meta = commit_message.new_files[0]
+        blob_file_metas = commit_message.new_files[1:]
+        # Validate row count consistency
+        parquet_row_count = normal_file_meta.row_count
+        blob_row_count_sum = sum(meta.row_count for meta in blob_file_metas)
+        self.assertEqual(parquet_row_count, blob_row_count_sum,
+                         f"Parquet row count ({parquet_row_count}) should 
equal "
+                         f"sum of blob row counts ({blob_row_count_sum})")
+
+        # Verify commit message structure and file metadata
+        total_file_size = 0
+        total_row_count = parquet_row_count
+        for commit_msg in commit_messages:
+            self.assertIsInstance(commit_msg.new_files, list)
+            self.assertGreater(len(commit_msg.new_files), 0)
+
+            # Verify file metadata structure
+            for file_meta in commit_msg.new_files:
+                self.assertIsNotNone(file_meta.file_name)
+                self.assertGreater(file_meta.file_size, 0)
+                self.assertGreater(file_meta.row_count, 0)
+                total_file_size += file_meta.file_size
+
+        # Verify total data written (50 rows of normal data + 50 rows of blob 
data = 100 total)
+        self.assertEqual(total_row_count, 50)
+
+        # Verify total file size is substantial (should be much larger than 
2.5GB due to overhead)
+        total_size_mb = total_file_size / (1024 * 1024)
+        self.assertGreater(total_size_mb, 2000)  # Should be at least 2GB due 
to overhead
+
+        total_files = sum(len(commit_msg.new_files) for commit_msg in 
commit_messages)
+        print(f"Total data written: {total_size_mb:.2f}MB across {total_files} 
files")
+        print(f"Total rows processed: {total_row_count}")
+
+        blob_writer.close()
+
+    def test_data_blob_writer_abort_functionality(self):
+        """Test DataBlobWriter abort functionality."""
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('blob_data', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.abort_test', schema, False)
+        table = self.catalog.get_table('test_db.abort_test')
+
+        # Use proper table API to create writer
+        write_builder = table.new_batch_write_builder()
+        blob_writer = write_builder.new_write()
+
+        # Test data
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2],
+            'blob_data': [b'blob_1', b'blob_2']
+        }, schema=pa_schema)
+
+        # Write some data
+        for batch in test_data.to_batches():
+            blob_writer.write_arrow_batch(batch)
+
+        # Test abort - BatchTableWrite doesn't have abort method
+        # The abort functionality is handled internally by DataBlobWriter
+
+        blob_writer.close()
+
+    def test_data_blob_writer_multiple_batches(self):
+        """Test DataBlobWriter with multiple batches and verify results."""
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('document', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.multiple_batches_test', schema, 
False)
+        table = self.catalog.get_table('test_db.multiple_batches_test')
+
+        # Use proper table API to create writer
+        write_builder = table.new_batch_write_builder()
+        blob_writer = write_builder.new_write()
+
+        # Test data - multiple batches
+        batch1_data = pa.Table.from_pydict({
+            'id': [1, 2],
+            'name': ['Alice', 'Bob'],
+            'document': [b'document_1_content', b'document_2_content']
+        }, schema=pa_schema)
+
+        batch2_data = pa.Table.from_pydict({
+            'id': [3, 4, 5],
+            'name': ['Charlie', 'David', 'Eve'],
+            'document': [b'document_3_content', b'document_4_content', 
b'document_5_content']
+        }, schema=pa_schema)
+
+        batch3_data = pa.Table.from_pydict({
+            'id': [6],
+            'name': ['Frank'],
+            'document': [b'document_6_content']
+        }, schema=pa_schema)
+
+        # Write multiple batches
+        total_rows = 0
+        for batch in batch1_data.to_batches():
+            blob_writer.write_arrow_batch(batch)
+            total_rows += batch.num_rows
+
+        for batch in batch2_data.to_batches():
+            blob_writer.write_arrow_batch(batch)
+            total_rows += batch.num_rows
+
+        for batch in batch3_data.to_batches():
+            blob_writer.write_arrow_batch(batch)
+            total_rows += batch.num_rows
+
+        # Record count is tracked internally by DataBlobWriter
+
+        # Test prepare commit
+        commit_messages = blob_writer.prepare_commit()
+        self.assertIsInstance(commit_messages, list)
+
+        # Verify we have committed files
+        self.assertGreater(len(commit_messages), 0)
+
+        blob_writer.close()
+
+    def test_data_blob_writer_large_batches(self):
+        """Test DataBlobWriter with large batches to test rolling behavior."""
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('description', pa.string()),
+            ('large_blob', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.large_batches_test', schema, False)
+        table = self.catalog.get_table('test_db.large_batches_test')
+
+        # Use proper table API to create writer
+        write_builder = table.new_batch_write_builder()
+        blob_writer = write_builder.new_write()
+
+        # Create large batches with substantial blob data
+        large_blob_data = b'L' * 10000  # 10KB blob data
+
+        # Batch 1: 100 rows
+        batch1_data = pa.Table.from_pydict({
+            'id': list(range(1, 101)),
+            'description': [f'Description for row {i}' for i in range(1, 101)],
+            'large_blob': [large_blob_data] * 100
+        }, schema=pa_schema)
+
+        # Batch 2: 50 rows
+        batch2_data = pa.Table.from_pydict({
+            'id': list(range(101, 151)),
+            'description': [f'Description for row {i}' for i in range(101, 
151)],
+            'large_blob': [large_blob_data] * 50
+        }, schema=pa_schema)
+
+        # Write large batches
+        total_rows = 0
+        for batch in batch1_data.to_batches():
+            blob_writer.write_arrow_batch(batch)
+            total_rows += batch.num_rows
+
+        for batch in batch2_data.to_batches():
+            blob_writer.write_arrow_batch(batch)
+            total_rows += batch.num_rows
+
+        # Record count is tracked internally by DataBlobWriter
+
+        # Test prepare commit
+        commit_messages = blob_writer.prepare_commit()
+        self.assertIsInstance(commit_messages, list)
+
+        # Verify we have committed files
+        self.assertGreater(len(commit_messages), 0)
+
+        blob_writer.close()
+
+    def test_data_blob_writer_mixed_data_types(self):
+        """Test DataBlobWriter with mixed data types in blob column."""
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('type', pa.string()),
+            ('data', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.mixed_data_test', schema, False)
+        table = self.catalog.get_table('test_db.mixed_data_test')
+
+        # Use proper table API to create writer
+        write_builder = table.new_batch_write_builder()
+        blob_writer = write_builder.new_write()
+
+        # Test data with different types of blob content
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3, 4, 5],
+            'type': ['text', 'json', 'binary', 'image', 'pdf'],
+            'data': [
+                b'This is text content',
+                b'{"key": "value", "number": 42}',
+                b'\x00\x01\x02\x03\xff\xfe\xfd',
+                b'PNG_IMAGE_DATA_PLACEHOLDER',
+                b'%PDF-1.4\nPDF_CONTENT_PLACEHOLDER'
+            ]
+        }, schema=pa_schema)
+
+        # Write mixed data
+        total_rows = 0
+        for batch in test_data.to_batches():
+            blob_writer.write_arrow_batch(batch)
+            total_rows += batch.num_rows
+
+        # Record count is tracked internally by DataBlobWriter
+
+        # Test prepare commit
+        commit_messages = blob_writer.prepare_commit()
+        self.assertIsInstance(commit_messages, list)
+
+        # Verify we have committed files
+        self.assertGreater(len(commit_messages), 0)
+
+        # Verify commit message structure
+        for commit_msg in commit_messages:
+            self.assertIsInstance(commit_msg.new_files, list)
+            self.assertGreater(len(commit_msg.new_files), 0)
+
+            # Verify file metadata structure
+            for file_meta in commit_msg.new_files:
+                self.assertIsNotNone(file_meta.file_name)
+                self.assertGreater(file_meta.file_size, 0)
+                self.assertGreater(file_meta.row_count, 0)
+
+        # Should have both normal and blob files
+        file_names = [f.file_name for f in commit_msg.new_files]
+        parquet_files = [f for f in file_names if f.endswith('.parquet')]
+        blob_files = [f for f in file_names if f.endswith('.blob')]
+
+        self.assertGreater(len(parquet_files), 0, "Should have at least one 
parquet file")
+        self.assertGreater(len(blob_files), 0, "Should have at least one blob 
file")
+
+        # Create commit and commit the data
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        blob_writer.close()
+
+        # Read data back using table API
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+
+        # Verify the data was read back correctly
+        self.assertEqual(result.num_rows, 5, "Should have 5 rows")
+        self.assertEqual(result.num_columns, 3, "Should have 3 columns")
+
+        # Convert result to pandas for easier comparison
+        result_df = result.to_pandas()
+
+        # Verify each row matches the original data
+        for i in range(5):
+            original_id = test_data.column('id')[i].as_py()
+            original_type = test_data.column('type')[i].as_py()
+            original_data = test_data.column('data')[i].as_py()
+
+            result_id = result_df.iloc[i]['id']
+            result_type = result_df.iloc[i]['type']
+            result_data = result_df.iloc[i]['data']
+
+            self.assertEqual(result_id, original_id, f"Row {i+1}: ID should 
match")
+            self.assertEqual(result_type, original_type, f"Row {i+1}: Type 
should match")
+            self.assertEqual(result_data, original_data, f"Row {i+1}: Blob 
data should match")
+
+    def test_data_blob_writer_empty_batches(self):
+        """Test DataBlobWriter with empty batches."""
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('data', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.empty_batches_test', schema, False)
+        table = self.catalog.get_table('test_db.empty_batches_test')
+
+        # Use proper table API to create writer
+        write_builder = table.new_batch_write_builder()
+        blob_writer = write_builder.new_write()
+
+        # Test data with some empty batches
+        batch1_data = pa.Table.from_pydict({
+            'id': [1, 2],
+            'data': [b'data1', b'data2']
+        }, schema=pa_schema)
+
+        # Empty batch
+        empty_batch = pa.Table.from_pydict({
+            'id': [],
+            'data': []
+        }, schema=pa_schema)
+
+        batch2_data = pa.Table.from_pydict({
+            'id': [3],
+            'data': [b'data3']
+        }, schema=pa_schema)
+
+        # Write batches including empty ones
+        total_rows = 0
+        for batch in batch1_data.to_batches():
+            blob_writer.write_arrow_batch(batch)
+            total_rows += batch.num_rows
+
+        for batch in empty_batch.to_batches():
+            blob_writer.write_arrow_batch(batch)
+            total_rows += batch.num_rows
+
+        for batch in batch2_data.to_batches():
+            blob_writer.write_arrow_batch(batch)
+            total_rows += batch.num_rows
+
+        # Verify record count (empty batch should not affect count)
+        # Record count is tracked internally by DataBlobWriter
+        # Record count is tracked internally by DataBlobWriter
+
+        # Test prepare commit
+        commit_messages = blob_writer.prepare_commit()
+        self.assertIsInstance(commit_messages, list)
+
+        blob_writer.close()
+
+    def test_data_blob_writer_rolling_behavior(self):
+        """Test DataBlobWriter rolling behavior with multiple commits."""
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('content', pa.string()),
+            ('blob_data', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.rolling_test', schema, False)
+        table = self.catalog.get_table('test_db.rolling_test')
+
+        # Use proper table API to create writer
+        write_builder = table.new_batch_write_builder()
+        blob_writer = write_builder.new_write()
+
+        # Create data that should trigger rolling
+        large_content = 'X' * 1000  # Large string content
+        large_blob = b'B' * 5000    # Large blob data
+
+        # Write multiple batches to test rolling
+        for i in range(10):  # 10 batches
+            batch_data = pa.Table.from_pydict({
+                'id': [i * 10 + j for j in range(10)],
+                'content': [f'{large_content}_{i}_{j}' for j in range(10)],
+                'blob_data': [large_blob] * 10
+            }, schema=pa_schema)
+
+            for batch in batch_data.to_batches():
+                blob_writer.write_arrow_batch(batch)
+
+        # Verify total record count
+        # Record count is tracked internally by DataBlobWriter
+
+        # Test prepare commit
+        commit_messages = blob_writer.prepare_commit()
+        self.assertIsInstance(commit_messages, list)
+
+        # Verify we have committed files
+        self.assertGreater(len(commit_messages), 0)
+
+        # Verify file metadata structure
+        for commit_msg in commit_messages:
+            for file_meta in commit_msg.new_files:
+                self.assertIsNotNone(file_meta.file_name)
+                self.assertGreater(file_meta.file_size, 0)
+                self.assertGreater(file_meta.row_count, 0)
+
+        blob_writer.close()
+
+    def test_blob_write_read_end_to_end(self):
+        """Test complete end-to-end blob functionality: write blob data and 
read it back to verify correctness."""
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('description', pa.string()),
+            ('blob_data', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.blob_write_read_e2e', schema, False)
+        table = self.catalog.get_table('test_db.blob_write_read_e2e')
+
+        # Test data with various blob sizes and types
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3, 4, 5],
+            'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
+            'description': ['User 1', 'User 2', 'User 3', 'User 4', 'User 5'],
+            'blob_data': [
+                b'small_blob_1',
+                b'medium_blob_data_2_with_more_content',
+                b'large_blob_data_3_with_even_more_content_and_details',
+                
b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here',  # 
noqa: E501
+                
b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects'
  # noqa: E501
+            ]
+        }, schema=pa_schema)
+
+        # Write data using table API
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+
+        # Commit the data
+        commit_messages = writer.prepare_commit()
+        self.assertGreater(len(commit_messages), 0)
+
+        # Verify commit message structure
+        for commit_msg in commit_messages:
+            self.assertIsInstance(commit_msg.new_files, list)
+        self.assertGreater(len(commit_msg.new_files), 0)
+
+        # Should have both normal and blob files
+        file_names = [f.file_name for f in commit_msg.new_files]
+        parquet_files = [f for f in file_names if f.endswith('.parquet')]
+        blob_files = [f for f in file_names if f.endswith('.blob')]
+
+        self.assertGreater(len(parquet_files), 0, "Should have at least one 
parquet file")
+        self.assertGreater(len(blob_files), 0, "Should have at least one blob 
file")
+
+        # Create commit and commit the data
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        # Read data back using table API
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+
+        # Verify the data was read back correctly
+        self.assertEqual(result.num_rows, 5, "Should have 5 rows")
+        self.assertEqual(result.num_columns, 4, "Should have 4 columns")
+
+        # Verify normal columns
+        self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID 
column should match")
+        self.assertEqual(result.column('name').to_pylist(), ['Alice', 'Bob', 
'Charlie', 'David', 'Eve'], "Name column should match")  # noqa: E501
+        self.assertEqual(result.column('description').to_pylist(), ['User 1', 
'User 2', 'User 3', 'User 4', 'User 5'], "Description column should match")  # 
noqa: E501
+
+        # Verify blob data correctness
+        blob_data = result.column('blob_data').to_pylist()
+        expected_blobs = [
+            b'small_blob_1',
+            b'medium_blob_data_2_with_more_content',
+            b'large_blob_data_3_with_even_more_content_and_details',
+            
b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here',  # 
noqa: E501
+            
b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects'
  # noqa: E501
+        ]
+
+        self.assertEqual(len(blob_data), 5, "Should have 5 blob records")
+        self.assertEqual(blob_data, expected_blobs, "Blob data should match 
exactly")
+
+        # Verify individual blob sizes
+        for i, (actual_blob, expected_blob) in enumerate(zip(blob_data, 
expected_blobs)):
+            self.assertEqual(len(actual_blob), len(expected_blob), f"Blob 
{i+1} size should match")
+            self.assertEqual(actual_blob, expected_blob, f"Blob {i+1} content 
should match exactly")
+
+        print(f"✅ End-to-end blob write/read test passed: wrote and read back 
{len(blob_data)} blob records correctly")  # noqa: E501
+
+    def test_blob_write_read_large_data_end_to_end(self):
+        """Test end-to-end blob functionality with large blob data (1MB per 
blob)."""
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('metadata', pa.string()),
+            ('large_blob', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.blob_large_write_read_e2e', schema, 
False)
+        table = self.catalog.get_table('test_db.blob_large_write_read_e2e')
+
+        # Create large blob data (1MB per blob)
+        large_blob_size = 1024 * 1024  # 1MB
+        blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024  # ~1KB pattern
+        pattern_size = len(blob_pattern)
+        repetitions = large_blob_size // pattern_size
+        large_blob_data = blob_pattern * repetitions
+
+        # Test data with large blobs
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'metadata': ['Large blob 1', 'Large blob 2', 'Large blob 3'],
+            'large_blob': [large_blob_data, large_blob_data, large_blob_data]
+        }, schema=pa_schema)
+
+        # Write data
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+
+        # Commit the data
+        commit_messages = writer.prepare_commit()
+        self.assertGreater(len(commit_messages), 0)
+
+        # Verify commit message structure
+        for commit_msg in commit_messages:
+            self.assertIsInstance(commit_msg.new_files, list)
+        self.assertGreater(len(commit_msg.new_files), 0)
+
+        # Should have both normal and blob files
+        file_names = [f.file_name for f in commit_msg.new_files]
+        parquet_files = [f for f in file_names if f.endswith('.parquet')]
+        blob_files = [f for f in file_names if f.endswith('.blob')]
+
+        self.assertGreater(len(parquet_files), 0, "Should have at least one 
parquet file")
+        self.assertGreater(len(blob_files), 0, "Should have at least one blob 
file")
+
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        # Read data back
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+
+        # Verify the data
+        self.assertEqual(result.num_rows, 3, "Should have 3 rows")
+        self.assertEqual(result.num_columns, 3, "Should have 3 columns")
+
+        # Verify normal columns
+        self.assertEqual(result.column('id').to_pylist(), [1, 2, 3], "ID 
column should match")
+        self.assertEqual(result.column('metadata').to_pylist(), ['Large blob 
1', 'Large blob 2', 'Large blob 3'], "Metadata column should match")  # noqa: 
E501
+
+        # Verify blob data integrity
+        blob_data = result.column('large_blob').to_pylist()
+        self.assertEqual(len(blob_data), 3, "Should have 3 blob records")
+
+        for i, blob in enumerate(blob_data):
+            self.assertEqual(len(blob), len(large_blob_data), f"Blob {i+1} 
should be {large_blob_size} bytes")
+            self.assertEqual(blob, large_blob_data, f"Blob {i+1} content 
should match exactly")
+            print(f"✅ Verified large blob {i+1}: {len(blob)} bytes")
+
+        print(f"✅ Large blob end-to-end test passed: wrote and read back 
{len(blob_data)} large blob records correctly")  # noqa: E501
+
+    def test_blob_write_read_mixed_sizes_end_to_end(self):
+        """Test end-to-end blob functionality with mixed blob sizes."""
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('size_category', pa.string()),
+            ('blob_data', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.blob_mixed_sizes_write_read_e2e', 
schema, False)
+        table = 
self.catalog.get_table('test_db.blob_mixed_sizes_write_read_e2e')
+
+        # Create blobs of different sizes
+        tiny_blob = b'tiny'
+        small_blob = b'small_blob_data' * 10  # ~140 bytes
+        medium_blob = b'medium_blob_data' * 100  # ~1.4KB
+        large_blob = b'large_blob_data' * 1000  # ~14KB
+        huge_blob = b'huge_blob_data' * 10000  # ~140KB
+
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3, 4, 5],
+            'size_category': ['tiny', 'small', 'medium', 'large', 'huge'],
+            'blob_data': [tiny_blob, small_blob, medium_blob, large_blob, 
huge_blob]
+        }, schema=pa_schema)
+
+        # Write data
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+
+        # Commit
+        commit_messages = writer.prepare_commit()
+        self.assertGreater(len(commit_messages), 0)
+
+        # Verify commit message structure
+        for commit_msg in commit_messages:
+            self.assertIsInstance(commit_msg.new_files, list)
+        self.assertGreater(len(commit_msg.new_files), 0)
+
+        # Should have both normal and blob files
+        file_names = [f.file_name for f in commit_msg.new_files]
+        parquet_files = [f for f in file_names if f.endswith('.parquet')]
+        blob_files = [f for f in file_names if f.endswith('.blob')]
+
+        self.assertGreater(len(parquet_files), 0, "Should have at least one 
parquet file")
+        self.assertGreater(len(blob_files), 0, "Should have at least one blob 
file")
+
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        # Read back
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+
+        # Verify
+        self.assertEqual(result.num_rows, 5, "Should have 5 rows")
+        self.assertEqual(result.num_columns, 3, "Should have 3 columns")
+
+        # Verify normal columns
+        self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID 
column should match")
+        self.assertEqual(result.column('size_category').to_pylist(), ['tiny', 
'small', 'medium', 'large', 'huge'], "Size category column should match")  # 
noqa: E501
+
+        # Verify blob data
+        blob_data = result.column('blob_data').to_pylist()
+        expected_blobs = [tiny_blob, small_blob, medium_blob, large_blob, 
huge_blob]
+
+        self.assertEqual(len(blob_data), 5, "Should have 5 blob records")
+        self.assertEqual(blob_data, expected_blobs, "Blob data should match 
exactly")
+
+        # Verify sizes
+        sizes = [len(blob) for blob in blob_data]
+        expected_sizes = [len(blob) for blob in expected_blobs]
+        self.assertEqual(sizes, expected_sizes, "Blob sizes should match")
+
+        # Verify individual blob content
+        for i, (actual_blob, expected_blob) in enumerate(zip(blob_data, 
expected_blobs)):
+            self.assertEqual(actual_blob, expected_blob, f"Blob {i+1} content 
should match exactly")
+
+        print(f"✅ Mixed sizes end-to-end test passed: wrote and read back 
blobs ranging from {min(sizes)} to {max(sizes)} bytes")  # noqa: E501
+
+    def test_blob_write_read_large_data_end_to_end_with_rolling(self):
+        """Test end-to-end blob functionality with large blob data (50MB per 
blob) and rolling behavior (40 blobs)."""
+        from pypaimon import Schema
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('batch_id', pa.int32()),
+            ('metadata', pa.string()),
+            ('large_blob', pa.large_binary()),
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true'
+            }
+        )
+        self.catalog.create_table('test_db.blob_large_rolling_e2e', schema, 
False)
+        table = self.catalog.get_table('test_db.blob_large_rolling_e2e')
+
+        # Create large blob data (50MB per blob)
+        large_blob_size = 50 * 1024 * 1024  # 50MB
+        blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024  # ~1KB pattern
+        pattern_size = len(blob_pattern)
+        repetitions = large_blob_size // pattern_size
+        large_blob_data = blob_pattern * repetitions
+
+        # Verify the blob size is exactly 50MB
+        actual_size = len(large_blob_data)
+        print(f"Created blob data: {actual_size:,} bytes ({actual_size / 
(1024*1024):.2f} MB)")
+
+        # Write 40 batches of data (each with 1 blob of 50MB)
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+
+        # Write all 40 batches first
+        for batch_id in range(40):
+            # Create test data for this batch
+            test_data = pa.Table.from_pydict({
+                'id': [batch_id + 1],
+                'batch_id': [batch_id],
+                'metadata': [f'Large blob batch {batch_id + 1}'],
+                'large_blob': [large_blob_data]
+            }, schema=pa_schema)
+
+            # Write data
+            writer.write_arrow(test_data)
+
+            # Print progress every 10 batches
+            if (batch_id + 1) % 10 == 0:
+                print(f"✅ Written batch {batch_id + 1}/40: 
{len(large_blob_data):,} bytes")
+
+        print("✅ Successfully wrote all 40 batches of 50MB blobs")
+
+        # Commit all data at once
+        commit_messages = writer.prepare_commit()
+        self.assertGreater(len(commit_messages), 0)
+
+        # Verify commit message structure
+        for commit_msg in commit_messages:
+            self.assertIsInstance(commit_msg.new_files, list)
+        self.assertGreater(len(commit_msg.new_files), 0)
+
+        # Should have both normal and blob files
+        file_names = [f.file_name for f in commit_msg.new_files]
+        parquet_files = [f for f in file_names if f.endswith('.parquet')]
+        blob_files = [f for f in file_names if f.endswith('.blob')]
+
+        self.assertGreater(len(parquet_files), 0, "Should have at least one 
parquet file")
+        self.assertGreater(len(blob_files), 0, "Should have at least one blob 
file")
+
+        # Commit the data
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        print(f"✅ Successfully committed {len(commit_messages)} commit 
messages with 40 batches of 50MB blobs")
+
+        # Read data back
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+
+        # Verify the data
+        self.assertEqual(result.num_rows, 40, "Should have 40 rows")
+        self.assertEqual(result.num_columns, 4, "Should have 4 columns")
+
+        # Verify normal columns
+        expected_ids = list(range(1, 41))
+        expected_batch_ids = list(range(40))
+        expected_metadata = [f'Large blob batch {i}' for i in range(1, 41)]
+
+        self.assertEqual(result.column('id').to_pylist(), expected_ids, "ID 
column should match")
+        self.assertEqual(result.column('batch_id').to_pylist(), 
expected_batch_ids, "Batch ID column should match")  # noqa: E501
+        self.assertEqual(result.column('metadata').to_pylist(), 
expected_metadata, "Metadata column should match")  # noqa: E501
+
+        # Verify blob data integrity
+        blob_data = result.column('large_blob').to_pylist()
+        self.assertEqual(len(blob_data), 40, "Should have 40 blob records")
+
+        # Verify each blob
+        for i, blob in enumerate(blob_data):
+            self.assertEqual(len(blob), len(large_blob_data), f"Blob {i+1} 
should be {large_blob_size:,} bytes")
+            self.assertEqual(blob, large_blob_data, f"Blob {i+1} content 
should match exactly")
+
+            # Print progress every 10 blobs
+            if (i + 1) % 10 == 0:
+                print(f"✅ Verified blob {i+1}/40: {len(blob):,} bytes")
+
+        # Verify total data size
+        total_blob_size = sum(len(blob) for blob in blob_data)
+        expected_total_size = 40 * len(large_blob_data)
+        self.assertEqual(total_blob_size, expected_total_size,
+                         f"Total blob size should be {expected_total_size:,} 
bytes")
+
+        print("✅ Large blob rolling end-to-end test passed:")
+        print("   - Wrote and read back 40 blobs of 50MB each")
+        print(f"   - Total data size: {total_blob_size:,} bytes 
({total_blob_size / (1024*1024*1024):.2f} GB)")  # noqa: E501
+        print("   - All blob content verified as correct")
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 4e5b4d723e..97804bbf6b 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -21,6 +21,7 @@ import uuid
 from pathlib import Path
 from typing import List
 
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate_builder import PredicateBuilder
 from pypaimon.manifest.manifest_file_manager import ManifestFileManager
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
@@ -134,7 +135,7 @@ class FileStoreCommit:
         new_snapshot_id = self._generate_snapshot_id()
 
         # Check if row tracking is enabled
-        row_tracking_enabled = self.table.options.get('row-tracking.enabled', 
'false').lower() == 'true'
+        row_tracking_enabled = 
self.table.options.get(CoreOptions.ROW_TRACKING_ENABLED, 'false').lower() == 
'true'
 
         # Apply row tracking logic if enabled
         next_row_id = None
diff --git a/paimon-python/pypaimon/write/file_store_write.py 
b/paimon-python/pypaimon/write/file_store_write.py
index 841fef3a65..35b4a7d980 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -21,6 +21,7 @@ import pyarrow as pa
 
 from pypaimon.write.commit_message import CommitMessage
 from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
+from pypaimon.write.writer.data_blob_writer import DataBlobWriter
 from pypaimon.write.writer.data_writer import DataWriter
 from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter
 
@@ -44,7 +45,15 @@ class FileStoreWrite:
         writer.write(data)
 
     def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter:
-        if self.table.is_primary_key_table:
+        # Check if table has blob columns
+        if self._has_blob_columns():
+            return DataBlobWriter(
+                table=self.table,
+                partition=partition,
+                bucket=bucket,
+                max_seq_number=self.max_seq_numbers.get((partition, bucket), 
1),
+            )
+        elif self.table.is_primary_key_table:
             return KeyValueDataWriter(
                 table=self.table,
                 partition=partition,
@@ -60,6 +69,17 @@ class FileStoreWrite:
                 write_cols=self.write_cols
             )
 
+    def _has_blob_columns(self) -> bool:
+        """Check if the table schema contains blob columns."""
+        for field in self.table.table_schema.fields:
+            # Check if field type is blob
+            if hasattr(field.type, 'type') and field.type.type == 'BLOB':
+                return True
+            # Alternative: check for specific blob type class
+            elif hasattr(field.type, '__class__') and 'blob' in 
field.type.__class__.__name__.lower():
+                return True
+        return False
+
     def prepare_commit(self) -> List[CommitMessage]:
         commit_messages = []
         for (partition, bucket), writer in self.data_writers.items():
diff --git a/paimon-python/pypaimon/common/core_options.py 
b/paimon-python/pypaimon/write/writer/blob_writer.py
similarity index 50%
copy from paimon-python/pypaimon/common/core_options.py
copy to paimon-python/pypaimon/write/writer/blob_writer.py
index 8ab26fe062..ff153da843 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -16,36 +16,31 @@
 # limitations under the License.
 
################################################################################
 
-from enum import Enum
-
-
-class CoreOptions(str, Enum):
-    """Core options for paimon."""
-
-    def __str__(self):
-        return self.value
-
-    # Basic options
-    AUTO_CREATE = "auto-create"
-    PATH = "path"
-    TYPE = "type"
-    BRANCH = "branch"
-    BUCKET = "bucket"
-    BUCKET_KEY = "bucket-key"
-    WAREHOUSE = "warehouse"
-    # File format options
-    FILE_FORMAT = "file.format"
-    FILE_FORMAT_ORC = "orc"
-    FILE_FORMAT_AVRO = "avro"
-    FILE_FORMAT_PARQUET = "parquet"
-    FILE_FORMAT_BLOB = "blob"
-    FILE_COMPRESSION = "file.compression"
-    FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
-    FILE_FORMAT_PER_LEVEL = "file.format.per.level"
-    FILE_BLOCK_SIZE = "file.block-size"
-    FILE_BLOB_AS_DESCRIPTOR = "blob-as-descriptor"
-    # Scan options
-    SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
-    INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
-    # Commit options
-    COMMIT_USER_PREFIX = "commit.user-prefix"
+import logging
+from typing import Tuple
+
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
+
+logger = logging.getLogger(__name__)
+
+
+class BlobWriter(AppendOnlyDataWriter):
+
+    def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int, blob_column: str):
+        super().__init__(table, partition, bucket, max_seq_number, 
[blob_column])
+
+        # Override file format to "blob"
+        self.file_format = CoreOptions.FILE_FORMAT_BLOB
+
+        logger.info("Initialized BlobWriter with blob file format")
+
+    @staticmethod
+    def _get_column_stats(record_batch, column_name: str):
+        column_array = record_batch.column(column_name)
+        # For blob data, don't generate min/max values
+        return {
+            "min_values": None,
+            "max_values": None,
+            "null_counts": column_array.null_count,
+        }
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py 
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
new file mode 100644
index 0000000000..e34b9a5701
--- /dev/null
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -0,0 +1,321 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import logging
+import uuid
+from datetime import datetime
+from pathlib import Path
+from typing import List, Optional, Tuple
+
+import pyarrow as pa
+
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.write.writer.data_writer import DataWriter
+
+logger = logging.getLogger(__name__)
+
+
+class DataBlobWriter(DataWriter):
+    """
+    A rolling file writer that handles both normal data and blob data. This 
writer creates separate
+    files for normal columns and blob columns, managing their lifecycle 
independently.
+
+    For example, given a table schema with normal columns (id INT, name 
STRING) and a blob column
+    (data BLOB), this writer will create separate files for (id, name) and 
(data).
+
+    Key features:
+    - Blob data can roll independently when normal data doesn't need rolling
+    - When normal data rolls, blob data MUST also be closed (Java behavior)
+    - Blob data uses more aggressive rolling (smaller target size) to prevent 
memory issues
+    - One normal data file may correspond to multiple blob data files
+    - Blob data is written immediately to disk to prevent memory corruption
+    - Blob file metadata is stored as separate DataFileMeta objects after 
normal file metadata
+
+    Rolling behavior:
+    - Normal data rolls: Both normal and blob writers are closed together, 
blob metadata added after normal metadata
+    - Blob data rolls independently: Only blob writer is closed, blob metadata 
is cached until normal data rolls
+
+    Metadata organization:
+    - Normal file metadata is added first to committed_files
+    - Blob file metadata is added after normal file metadata in committed_files
+    - When blob rolls independently, metadata is cached until normal data rolls
+    - Result: [normal_meta, blob_meta1, blob_meta2, blob_meta3, ...]
+
+    Example file organization:
+    committed_files = [
+        normal_file1_meta,    # f1.parquet metadata
+        blob_file1_meta,      # b1.blob metadata
+        blob_file2_meta,      # b2.blob metadata
+        blob_file3_meta,      # b3.blob metadata
+        normal_file2_meta,    # f1-2.parquet metadata
+        blob_file4_meta,      # b4.blob metadata
+        blob_file5_meta,      # b5.blob metadata
+    ]
+
+    This matches the Java RollingBlobFileWriter behavior exactly.
+    """
+
+    # Constant for checking rolling condition periodically
+    CHECK_ROLLING_RECORD_CNT = 1000
+
+    def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int):
+        super().__init__(table, partition, bucket, max_seq_number)
+
+        # Determine blob column from table schema
+        self.blob_column_name = self._get_blob_columns_from_schema()
+
+        # Split schema into normal and blob columns
+        all_column_names = [field.name for field in 
self.table.table_schema.fields]
+        self.normal_column_names = [col for col in all_column_names if col != 
self.blob_column_name]
+        self.write_cols = self.normal_column_names
+
+        # State management for blob writer
+        self.record_count = 0
+        self.closed = False
+
+        # Track pending data for normal data only
+        self.pending_normal_data: Optional[pa.Table] = None
+
+        # Initialize blob writer with blob column name
+        from pypaimon.write.writer.blob_writer import BlobWriter
+        self.blob_writer = BlobWriter(
+            table=self.table,
+            partition=self.partition,
+            bucket=self.bucket,
+            max_seq_number=max_seq_number,
+            blob_column=self.blob_column_name
+        )
+
+        logger.info(f"Initialized DataBlobWriter with blob column: 
{self.blob_column_name}")
+
+    def _get_blob_columns_from_schema(self) -> str:
+        blob_columns = []
+        for field in self.table.table_schema.fields:
+            type_str = str(field.type).lower()
+            if 'blob' in type_str:
+                blob_columns.append(field.name)
+
+        # Validate blob column count (matching Java constraint)
+        if len(blob_columns) == 0:
+            raise ValueError("No blob field found in table schema.")
+        elif len(blob_columns) > 1:
+            raise ValueError("Limit exactly one blob field in one paimon table 
yet.")
+
+        return blob_columns[0]  # Return single blob column name
+
+    def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
+        normal_data, _ = self._split_data(data)
+        return normal_data
+
+    def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> 
pa.Table:
+        return self._merge_normal_data(existing_data, new_data)
+
+    def write(self, data: pa.RecordBatch):
+        try:
+            # Split data into normal and blob parts
+            normal_data, blob_data = self._split_data(data)
+
+            # Process and accumulate normal data
+            processed_normal = self._process_normal_data(normal_data)
+            if self.pending_normal_data is None:
+                self.pending_normal_data = processed_normal
+            else:
+                self.pending_normal_data = 
self._merge_normal_data(self.pending_normal_data, processed_normal)
+
+            # Write blob data directly to blob writer (handles its own rolling)
+            if blob_data is not None and blob_data.num_rows > 0:
+                # Write blob data directly to blob writer
+                self.blob_writer.write(blob_data)
+
+            self.record_count += data.num_rows
+
+            # Check if normal data rolling is needed
+            if self._should_roll_normal():
+                # When normal data rolls, close both writers and fetch blob 
metadata
+                self._close_current_writers()
+
+        except Exception as e:
+            logger.error("Exception occurs when writing data. Cleaning up.", 
exc_info=e)
+            self.abort()
+            raise e
+
+    def prepare_commit(self) -> List[DataFileMeta]:
+        # Close any remaining data
+        self._close_current_writers()
+
+        return self.committed_files.copy()
+
+    def close(self):
+        if self.closed:
+            return
+
+        try:
+            if self.pending_normal_data is not None and 
self.pending_normal_data.num_rows > 0:
+                self._close_current_writers()
+        except Exception as e:
+            logger.error("Exception occurs when closing writer. Cleaning up.", 
exc_info=e)
+            self.abort()
+        finally:
+            self.closed = True
+            self.pending_normal_data = None
+
+    def abort(self):
+        """Abort all writers and clean up resources."""
+        self.blob_writer.abort()
+        self.pending_normal_data = None
+        self.committed_files.clear()
+
+    def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, 
pa.RecordBatch]:
+        """Split data into normal and blob parts based on column names."""
+        # Use the pre-computed column names
+        normal_columns = self.normal_column_names
+        blob_columns = [self.blob_column_name]  # Single blob column
+
+        # Create projected batches
+        normal_data = data.select(normal_columns) if normal_columns else None
+        blob_data = data.select(blob_columns) if blob_columns else None
+
+        return normal_data, blob_data
+
+    def _process_normal_data(self, data: pa.RecordBatch) -> pa.Table:
+        """Process normal data (similar to base DataWriter)."""
+        if data is None or data.num_rows == 0:
+            return pa.Table.from_batches([])
+        return pa.Table.from_batches([data])
+
+    def _merge_normal_data(self, existing_data: pa.Table, new_data: pa.Table) 
-> pa.Table:
+        return pa.concat_tables([existing_data, new_data])
+
+    def _should_roll_normal(self) -> bool:
+        if self.pending_normal_data is None:
+            return False
+
+        # Check rolling condition periodically (every CHECK_ROLLING_RECORD_CNT 
records)
+        if self.record_count % self.CHECK_ROLLING_RECORD_CNT != 0:
+            return False
+
+        # Check if normal data exceeds target size
+        current_size = self.pending_normal_data.nbytes
+        return current_size > self.target_file_size
+
+    def _close_current_writers(self):
+        """Close both normal and blob writers and add blob metadata after 
normal metadata (Java behavior)."""
+        if self.pending_normal_data is None or 
self.pending_normal_data.num_rows == 0:
+            return
+
+        # Close normal writer and get metadata
+        normal_meta = self._write_normal_data_to_file(self.pending_normal_data)
+
+        # Fetch blob metadata from blob writer
+        blob_metas = self.blob_writer.prepare_commit()
+
+        # Validate consistency between normal and blob files (Java behavior)
+        self._validate_consistency(normal_meta, blob_metas)
+
+        # Add normal file metadata first
+        self.committed_files.append(normal_meta)
+
+        # Add blob file metadata after normal metadata
+        self.committed_files.extend(blob_metas)
+
+        # Reset pending data
+        self.pending_normal_data = None
+
+        logger.info(f"Closed both writers - normal: {normal_meta.file_name}, "
+                    f"added {len(blob_metas)} blob file metadata after normal 
metadata")
+
+    def _write_normal_data_to_file(self, data: pa.Table) -> DataFileMeta:
+        if data.num_rows == 0:
+            return None
+
+        file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
+        file_path = self._generate_file_path(file_name)
+
+        # Write file based on format
+        if self.file_format == CoreOptions.FILE_FORMAT_PARQUET:
+            self.file_io.write_parquet(file_path, data, 
compression=self.compression)
+        elif self.file_format == CoreOptions.FILE_FORMAT_ORC:
+            self.file_io.write_orc(file_path, data, 
compression=self.compression)
+        elif self.file_format == CoreOptions.FILE_FORMAT_AVRO:
+            self.file_io.write_avro(file_path, data)
+        else:
+            raise ValueError(f"Unsupported file format: {self.file_format}")
+
+        # Generate metadata
+        return self._create_data_file_meta(file_name, file_path, data)
+
+    def _create_data_file_meta(self, file_name: str, file_path: Path, data: 
pa.Table) -> DataFileMeta:
+        # Column stats (only for normal columns)
+        column_stats = {
+            field.name: self._get_column_stats(data, field.name)
+            for field in self.table.table_schema.fields
+            if field.name != self.blob_column_name
+        }
+
+        # Get normal fields only
+        normal_fields = [field for field in self.table.table_schema.fields
+                         if field.name != self.blob_column_name]
+
+        min_value_stats = [column_stats[field.name]['min_values'] for field in 
normal_fields]
+        max_value_stats = [column_stats[field.name]['max_values'] for field in 
normal_fields]
+        value_null_counts = [column_stats[field.name]['null_counts'] for field 
in normal_fields]
+
+        self.sequence_generator.start = self.sequence_generator.current
+
+        return DataFileMeta(
+            file_name=file_name,
+            file_size=self.file_io.get_file_size(file_path),
+            row_count=data.num_rows,
+            min_key=GenericRow([], []),
+            max_key=GenericRow([], []),
+            key_stats=SimpleStats(
+                GenericRow([], []),
+                GenericRow([], []),
+                []),
+            value_stats=SimpleStats(
+                GenericRow(min_value_stats, normal_fields),
+                GenericRow(max_value_stats, normal_fields),
+                value_null_counts),
+            min_sequence_number=-1,
+            max_sequence_number=-1,
+            schema_id=self.table.table_schema.id,
+            level=0,
+            extra_files=[],
+            creation_time=datetime.now(),
+            delete_row_count=0,
+            file_source="APPEND",
+            value_stats_cols=self.normal_column_names,
+            file_path=str(file_path),
+            write_cols=self.write_cols)
+
+    def _validate_consistency(self, normal_meta: DataFileMeta, blob_metas: 
List[DataFileMeta]):
+        if normal_meta is None:
+            return
+
+        normal_row_count = normal_meta.row_count
+        blob_row_count = sum(meta.row_count for meta in blob_metas)
+
+        if normal_row_count != blob_row_count:
+            raise RuntimeError(
+                f"This is a bug: The row count of main file and blob files 
does not match. "
+                f"Main file: {normal_meta.file_name} (row count: 
{normal_row_count}), "
+                f"blob files: {[meta.file_name for meta in blob_metas]} (total 
row count: {blob_row_count})"
+            )
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 0a063fb2f4..502d196ae6 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -61,14 +61,21 @@ class DataWriter(ABC):
         self.blob_as_descriptor = 
options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
 
     def write(self, data: pa.RecordBatch):
-        processed_data = self._process_data(data)
+        try:
+            processed_data = self._process_data(data)
 
-        if self.pending_data is None:
-            self.pending_data = processed_data
-        else:
-            self.pending_data = self._merge_data(self.pending_data, 
processed_data)
+            if self.pending_data is None:
+                self.pending_data = processed_data
+            else:
+                self.pending_data = self._merge_data(self.pending_data, 
processed_data)
 
-        self._check_and_roll_if_needed()
+            self._check_and_roll_if_needed()
+        except Exception as e:
+            import logging
+            logger = logging.getLogger(__name__)
+            logger.warning("Exception occurs when writing data. Cleaning up.", 
exc_info=e)
+            self.abort()
+            raise e
 
     def prepare_commit(self) -> List[DataFileMeta]:
         if self.pending_data is not None and self.pending_data.num_rows > 0:
@@ -78,6 +85,36 @@ class DataWriter(ABC):
         return self.committed_files.copy()
 
     def close(self):
+        try:
+            if self.pending_data is not None and self.pending_data.num_rows > 
0:
+                self._write_data_to_file(self.pending_data)
+        except Exception as e:
+            import logging
+            logger = logging.getLogger(__name__)
+            logger.warning("Exception occurs when closing writer. Cleaning 
up.", exc_info=e)
+            self.abort()
+            raise e
+        finally:
+            self.pending_data = None
+            # Note: Don't clear committed_files in close() - they should be 
returned by prepare_commit()
+
+    def abort(self):
+        """
+        Abort all writers and clean up resources. This method should be called 
when an error occurs
+        during writing. It deletes any files that were written and cleans up 
resources.
+        """
+        # Delete any files that were written
+        for file_meta in self.committed_files:
+            try:
+                if file_meta.file_path:
+                    self.file_io.delete_quietly(file_meta.file_path)
+            except Exception as e:
+                # Log but don't raise - we want to clean up as much as possible
+                import logging
+                logger = logging.getLogger(__name__)
+                logger.warning(f"Failed to delete file {file_meta.file_path} 
during abort: {e}")
+
+        # Clean up resources
         self.pending_data = None
         self.committed_files.clear()
 


Reply via email to