This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 40eadc2f01 [python] Support BlobView feature (#8021)
40eadc2f01 is described below

commit 40eadc2f0133d2859f85cc6b1e0de35433234846
Author: umi <[email protected]>
AuthorDate: Thu Jun 4 13:52:19 2026 +0800

    [python] Support BlobView feature (#8021)
    
    - Add Python BlobViewStruct / BlobView wire-format support and the
    blob-view-field option.
    - Store descriptor/view BLOB fields inline, validate bad inline field
    configuration and payloads, and avoid writing new .blob files for view
    fields.
    - Resolve blob-view fields during reads through catalog-aware lookup,
    returning bytes by default or upstream BlobDescriptor bytes when
    blob-as-descriptor=true.
---
 .../pypaimon/catalog/filesystem_catalog.py         |   6 +-
 .../pypaimon/common/options/core_options.py        |  26 ++
 .../read/reader/blob_descriptor_convert_reader.py  | 179 ++++++--
 .../pypaimon/read/reader/data_file_batch_reader.py |  61 ---
 paimon-python/pypaimon/read/split_read.py          |  55 ++-
 paimon-python/pypaimon/schema/schema.py            |  34 --
 paimon-python/pypaimon/schema/schema_manager.py    |  73 ++-
 paimon-python/pypaimon/table/file_store_table.py   |   5 +-
 paimon-python/pypaimon/table/row/blob.py           | 155 +++++++
 paimon-python/pypaimon/table/special_fields.py     |  19 +
 paimon-python/pypaimon/tests/blob_table_test.py    | 499 ++++++++++++++++++++-
 paimon-python/pypaimon/tests/blob_test.py          |  21 +-
 .../pypaimon/tests/external_storage_blob_test.py   |   2 +-
 paimon-python/pypaimon/utils/blob_view_lookup.py   | 274 +++++++++++
 .../write/writer/dedicated_format_writer.py        |  49 +-
 15 files changed, 1299 insertions(+), 159 deletions(-)

diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py 
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 86e2f775e7..b7356b4595 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -142,11 +142,11 @@ class FileSystemCatalog(Catalog):
         table_schema = self.get_table_schema(identifier)
 
         # Create catalog environment for filesystem catalog
-        # Filesystem catalog doesn't support version management by default
+        from pypaimon.catalog.filesystem_catalog_loader import 
FileSystemCatalogLoader
         catalog_environment = CatalogEnvironment(
             identifier=identifier,
-            uuid=None,  # Filesystem catalog doesn't track table UUIDs
-            catalog_loader=None,  # No catalog loader for filesystem
+            uuid=None,
+            catalog_loader=FileSystemCatalogLoader(self.catalog_context),
             supports_version_management=False
         )
 
diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index c46fb64be9..6013019aba 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -274,6 +274,17 @@ class CoreOptions:
         .with_description("Comma-separated field names to treat as BLOB view 
fields.")
     )
 
+    BLOB_VIEW_RESOLVE_ENABLED: ConfigOption[bool] = (
+        ConfigOptions.key("blob-view.resolve.enabled")
+        .boolean_type()
+        .default_value(True)
+        .with_description(
+            "Whether to resolve blob-view-field values from upstream tables at 
"
+            "read time. Set to false to preserve BlobViewStruct references 
when "
+            "forwarding blob view values to another blob-view table."
+        )
+    )
+
     VECTOR_FIELD: ConfigOption[str] = (
         ConfigOptions.key("vector-field")
         .string_type()
@@ -744,6 +755,21 @@ class CoreOptions:
 
     def blob_descriptor_fields(self, default=None):
         value = self.options.get(CoreOptions.BLOB_DESCRIPTOR_FIELD, default)
+        return CoreOptions._parse_field_set(value)
+
+    def blob_view_fields(self, default=None):
+        value = self.options.get(CoreOptions.BLOB_VIEW_FIELD, default)
+        return CoreOptions._parse_field_set(value)
+
+    def blob_field(self, default=None):
+        value = self.options.get(CoreOptions.BLOB_FIELD, default)
+        return CoreOptions._parse_field_set(value)
+
+    def blob_view_resolve_enabled(self, default=True):
+        return self.options.get(CoreOptions.BLOB_VIEW_RESOLVE_ENABLED, default)
+
+    @staticmethod
+    def _parse_field_set(value):
         if value is None:
             return set()
         if isinstance(value, str):
diff --git 
a/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py 
b/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py
index 35fe046a03..12e975bae5 100644
--- a/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py
+++ b/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py
@@ -15,68 +15,189 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Optional
+from typing import Callable, Optional, Set
 
+import pyarrow
 from pyarrow import RecordBatch
 
 from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.table.row.blob import Blob, BlobViewStruct
 
 
-class BlobDescriptorConvertReader(RecordBatchReader):
-    def __init__(self, inner: RecordBatchReader, table):
+class BlobInlineConvertReader(RecordBatchReader):
+    """Resolves BlobView and BlobDescriptor fields in record batches.
+
+    Processing is split into two clear stages:
+      Stage 1 (BlobView resolution): If view fields exist, use a lightweight
+               prescan reader (only projecting view columns) to collect
+               BlobViewStructs, bulk-preload their descriptors, then read
+               full data from the main reader and replace view field values
+               with the corresponding BlobDescriptor serialized bytes.
+      Stage 2 (BlobData resolution): Controlled by blob-as-descriptor option.
+               If false, resolve all BlobDescriptor bytes (from both descriptor
+               fields and view fields) into real blob data bytes.
+               If true, return as-is.
+    """
+
+    def __init__(self, inner: RecordBatchReader, table,
+                 prescan_reader_factory: Optional[Callable[[Set[str]], 
RecordBatchReader]] = None):
+        """
+        Args:
+            inner: The main data reader (reads all columns).
+            table: The table instance.
+            prescan_reader_factory: Optional factory that creates a lightweight
+                reader projecting only the specified field names. Used for
+                prescan to collect BlobViewStructs without reading all columns.
+                Signature: (field_names: Set[str]) -> RecordBatchReader
+        """
         self._inner = inner
         self._table = table
-        self._descriptor_fields = 
CoreOptions.blob_descriptor_fields(table.options)
+        self._prescan_reader_factory = prescan_reader_factory
         self.file_io = inner.file_io
         self.blob_field_indices = inner.blob_field_indices
+        # Preserve original BlobViewStruct bytes when resolve disabled: skip 
both
+        # view resolution (Stage 1) and descriptor-to-data resolution (Stage 
2).
+        resolve_enabled = CoreOptions.blob_view_resolve_enabled(
+            table.options) and self._table.catalog_environment.catalog_loader 
is not None
+        self._view_fields = CoreOptions.blob_view_fields(table.options) if 
resolve_enabled else set()
+        self._descriptor_fields = 
CoreOptions.blob_descriptor_fields(table.options)
+        self._blob_as_descriptor = 
CoreOptions.blob_as_descriptor(table.options)
+        self._prescan_done = False
+        self._blob_view_lookup = None
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
-        import pyarrow
+        # Align with Java: only enter blob view resolution when catalog_loader 
is available
+        # If catalog_loader is None, skip both Stage 1 (view resolution) and 
Stage 2 (descriptor resolution)
+        # This matches Java's behavior in DataEvolutionTableRead.createReader 
where blob view reader
+        # is only created when catalogContext != null
+        if self._view_fields and not self._prescan_done:
+            self._prescan_view_structs()
+
         batch = self._inner.read_arrow_batch()
         if batch is None:
             return None
-        return self._convert_batch(batch, pyarrow)
+        # Resolve view fields using the preloaded lookup
+        if self._view_fields and self._blob_view_lookup is not None:
+            batch = self._resolve_view_fields(batch, self._blob_view_lookup)
+        # Resolve BlobDescriptor -> real bytes (if blob-as-descriptor=false)
+        return self._resolve_descriptor_fields(batch)
+
+    # ------------------------------------------------------------------
+    # Stage 1: BlobView prescan (lightweight, only reads view columns)
+    # ------------------------------------------------------------------
+
+    def _prescan_view_structs(self):
+        """Use a lightweight prescan reader (projecting only view columns) to
+        collect all BlobViewStructs and bulk-preload their descriptors."""
+        from pypaimon.table.row.blob import BlobViewStruct
+        from pypaimon.utils.blob_view_lookup import BlobViewLookup
 
-    def _convert_batch(self, batch, pyarrow):
-        from pypaimon.table.row.blob import Blob, BlobDescriptor
+        all_view_structs = []
 
-        result = batch
-        for field_name in self._descriptor_fields:
-            if field_name not in result.schema.names:
+        prescan_reader = self._prescan_reader_factory(self._view_fields)
+        try:
+            while True:
+                batch = prescan_reader.read_arrow_batch()
+                if batch is None:
+                    break
+                for field_name in self._view_fields:
+                    if field_name not in batch.schema.names:
+                        continue
+                    for value in batch.column(field_name).to_pylist():
+                        value = self._normalize_blob_to_bytes(value)
+                        if value is None:
+                            continue
+                        if isinstance(value, bytes) and 
BlobViewStruct.is_blob_view_struct(value):
+                            
all_view_structs.append(BlobViewStruct.deserialize(value))
+                        else:
+                            raise ValueError(
+                                f"Expected BlobViewStruct bytes in view field 
'{field_name}', "
+                                f"but got non-BlobViewStruct bytes."
+                            )
+        finally:
+            prescan_reader.close()
+
+        # Bulk-preload BlobViewStruct -> BlobDescriptor mapping
+        if all_view_structs:
+            self._blob_view_lookup = BlobViewLookup(self._table)
+            self._blob_view_lookup.preload(all_view_structs)
+        self._prescan_done = True
+
+    def _resolve_view_fields(self, batch, blob_view_lookup):
+        """Replace BlobViewStruct bytes in view fields with the corresponding
+        BlobDescriptor serialized bytes."""
+        for field_name in self._view_fields:
+            if field_name not in batch.schema.names:
                 continue
-            values = result.column(field_name).to_pylist()
+            values = [self._normalize_blob_to_bytes(v) for v in 
batch.column(field_name).to_pylist()]
             converted_values = []
             for value in values:
                 if value is None:
                     converted_values.append(None)
                     continue
-                if hasattr(value, 'as_py'):
-                    value = value.as_py()
-                if isinstance(value, str):
-                    value = value.encode('utf-8')
-                if isinstance(value, bytearray):
-                    value = bytes(value)
                 if not isinstance(value, bytes):
                     converted_values.append(value)
                     continue
-                try:
-                    descriptor = BlobDescriptor.deserialize(value)
-                    if descriptor.serialize() != value:
-                        converted_values.append(value)
-                        continue
-                    uri_reader = 
self._table.file_io.uri_reader_factory.create(descriptor.uri)
-                    converted_values.append(Blob.from_descriptor(uri_reader, 
descriptor).to_data())
-                except Exception:
+                if not BlobViewStruct.is_blob_view_struct(value):
                     converted_values.append(value)
+                    continue
+                view_struct = BlobViewStruct.deserialize(value)
+                if blob_view_lookup.resolve_to_null(view_struct):
+                    converted_values.append(None)
+                else:
+                    descriptor = 
blob_view_lookup.resolve_descriptor(view_struct)
+                    converted_values.append(descriptor.serialize())
 
-            column_idx = result.schema.names.index(field_name)
-            result = result.set_column(
+            column_idx = batch.schema.names.index(field_name)
+            batch = batch.set_column(
                 column_idx,
                 pyarrow.field(field_name, pyarrow.large_binary(), 
nullable=True),
                 pyarrow.array(converted_values, type=pyarrow.large_binary()),
             )
-        return result
+        return batch
+
+    # ------------------------------------------------------------------
+    # Stage 2: BlobData resolution (unified exit)
+    # ------------------------------------------------------------------
+
+    def _resolve_descriptor_fields(self, batch):
+        if self._blob_as_descriptor:
+            return batch
+
+        all_inline_blob_fields = self._descriptor_fields | self._view_fields
+        for field_name in all_inline_blob_fields:
+            if field_name not in batch.schema.names:
+                continue
+            values = [self._normalize_blob_to_bytes(v) for v in 
batch.column(field_name).to_pylist()]
+            converted_values = []
+            for value in values:
+                blob = Blob.from_bytes(value, self._table.file_io)
+                converted_values.append(blob.to_data() if blob else None)
+
+            column_idx = batch.schema.names.index(field_name)
+            batch = batch.set_column(
+                column_idx,
+                pyarrow.field(field_name, pyarrow.large_binary(), 
nullable=True),
+                pyarrow.array(converted_values, type=pyarrow.large_binary()),
+            )
+        return batch
+
+    # ------------------------------------------------------------------
+    # Utilities
+    # ------------------------------------------------------------------
+
+    @staticmethod
+    def _normalize_blob_to_bytes(value):
+        if value is None:
+            return None
+        if hasattr(value, 'as_py'):
+            value = value.as_py()
+        if isinstance(value, str):
+            value = value.encode('utf-8')
+        if isinstance(value, bytearray):
+            value = bytes(value)
+        return value
 
     def close(self):
         self._inner.close()
diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py 
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index 12e6990e13..21d1b2a911 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -25,7 +25,6 @@ from pypaimon.read.partition_info import PartitionInfo
 from pypaimon.read.reader.format_blob_reader import FormatBlobReader
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
 from pypaimon.schema.data_types import DataField, PyarrowFieldParser
-from pypaimon.table.row.blob import Blob
 from pypaimon.table.special_fields import SpecialFields
 
 
@@ -40,8 +39,6 @@ class DataFileBatchReader(RecordBatchReader):
                  first_row_id: int,
                  row_tracking_enabled: bool,
                  system_fields: dict,
-                 blob_as_descriptor: bool = False,
-                 blob_descriptor_fields: Optional[set] = None,
                  file_io: Optional[FileIO] = None,
                  row_id_offsets: Optional[List[int]] = None):
         self.format_reader = format_reader
@@ -55,19 +52,7 @@ class DataFileBatchReader(RecordBatchReader):
         self._row_id_cursor = 0
         self.max_sequence_number = max_sequence_number
         self.system_fields = system_fields
-        self.blob_as_descriptor = blob_as_descriptor
-        self.blob_descriptor_fields = blob_descriptor_fields or set()
         self.file_io = file_io
-        self.blob_field_names = {
-            field.name
-            for field in fields
-            if hasattr(field.type, 'type') and field.type.type == 'BLOB'
-        }
-        self.descriptor_blob_fields = {
-            field_name
-            for field_name in self.blob_descriptor_fields
-            if field_name in self.blob_field_names
-        }
 
     def read_arrow_batch(self, start_idx=None, end_idx=None) -> 
Optional[RecordBatch]:
         if isinstance(self.format_reader, FormatBlobReader):
@@ -140,8 +125,6 @@ class DataFileBatchReader(RecordBatchReader):
         if self.row_tracking_enabled and self.system_fields:
             record_batch = self._assign_row_tracking(record_batch)
 
-        record_batch = 
self._convert_descriptor_stored_blob_columns(record_batch)
-
         return record_batch
 
     def _align_batch_to_read_schema(self, names: List[str], arrays: list) -> 
RecordBatch:
@@ -170,50 +153,6 @@ class DataFileBatchReader(RecordBatchReader):
             out_fields.append(target_field)
         return pa.RecordBatch.from_arrays(out_arrays, 
schema=pa.schema(out_fields))
 
-    def _convert_descriptor_stored_blob_columns(self, record_batch: 
RecordBatch) -> RecordBatch:
-        if isinstance(self.format_reader, FormatBlobReader):
-            return record_batch
-        if not self.descriptor_blob_fields:
-            return record_batch
-
-        schema_names = set(record_batch.schema.names)
-        target_fields = [f for f in self.descriptor_blob_fields if f in 
schema_names]
-        if not target_fields:
-            return record_batch
-
-        arrays = list(record_batch.columns)
-        for field_name in target_fields:
-            field_idx = record_batch.schema.get_field_index(field_name)
-            values = record_batch.column(field_idx).to_pylist()
-
-            if self.blob_as_descriptor:
-                converted = [self._normalize_blob_cell(v) for v in values]
-            else:
-                converted = [self._blob_cell_to_data(v) for v in values]
-            arrays[field_idx] = pa.array(converted, type=pa.large_binary())
-
-        return pa.RecordBatch.from_arrays(arrays, schema=record_batch.schema)
-
-    @staticmethod
-    def _normalize_blob_cell(value):
-        if value is None:
-            return None
-        if hasattr(value, 'as_py'):
-            value = value.as_py()
-        if isinstance(value, str):
-            value = value.encode('utf-8')
-        if isinstance(value, bytearray):
-            value = bytes(value)
-        return value
-
-    def _blob_cell_to_data(self, value):
-        value = self._normalize_blob_cell(value)
-        if value is None:
-            return None
-        if not isinstance(value, bytes):
-            return value
-        return Blob.from_bytes(value, self.file_io).to_data()
-
     def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch:
         """Assign row tracking meta fields (_ROW_ID and _SEQUENCE_NUMBER)."""
         arrays = list(record_batch.columns)
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 5a78cb07be..fe76527431 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -41,7 +41,7 @@ from pypaimon.read.reader.empty_record_reader import 
EmptyFileRecordReader
 from pypaimon.read.reader.field_bunch import BlobBunch, DataBunch, FieldBunch, 
VectorBunch
 from pypaimon.read.reader.filter_record_reader import FilterRecordReader
 from pypaimon.read.reader.format_avro_reader import FormatAvroReader
-from pypaimon.read.reader.blob_descriptor_convert_reader import 
BlobDescriptorConvertReader
+from pypaimon.read.reader.blob_descriptor_convert_reader import 
BlobInlineConvertReader
 from pypaimon.read.reader.filter_record_batch_reader import 
FilterRecordBatchReader
 from pypaimon.read.reader.limited_record_reader import 
LimitedRecordBatchReader, LimitedRecordReader
 from pypaimon.read.reader.row_range_filter_record_reader import 
RowIdFilterRecordBatchReader
@@ -314,9 +314,6 @@ class SplitRead(ABC):
         else:
             raise ValueError(f"Unexpected file format: {file_format}")
 
-        blob_as_descriptor = CoreOptions.blob_as_descriptor(self.table.options)
-        blob_descriptor_fields = 
CoreOptions.blob_descriptor_fields(self.table.options)
-
         index_mapping = self.create_index_mapping()
         partition_info = self._create_partition_info()
         system_fields = SpecialFields.find_system_fields(self.read_fields)
@@ -344,8 +341,6 @@ class SplitRead(ABC):
                 effective_first_row_id,
                 row_tracking_enabled,
                 system_fields,
-                blob_as_descriptor=blob_as_descriptor,
-                blob_descriptor_fields=blob_descriptor_fields,
                 file_io=self.table.file_io,
                 row_id_offsets=row_indices)
         else:
@@ -359,8 +354,6 @@ class SplitRead(ABC):
                 effective_first_row_id,
                 row_tracking_enabled,
                 system_fields,
-                blob_as_descriptor=blob_as_descriptor,
-                blob_descriptor_fields=blob_descriptor_fields,
                 file_io=self.table.file_io,
                 row_id_offsets=row_indices)
 
@@ -799,6 +792,20 @@ class DataEvolutionSplitRead(SplitRead):
         return None
 
     def create_reader(self) -> RecordReader:
+        reader = self._create_raw_reader()
+
+        if ((CoreOptions.blob_view_fields(self.table.options) and 
CoreOptions.blob_view_resolve_enabled(
+                self.table.options))
+                or (not CoreOptions.blob_as_descriptor(self.table.options)
+                    and 
CoreOptions.blob_descriptor_fields(self.table.options))):
+            reader = BlobInlineConvertReader(
+                reader, self.table,
+                prescan_reader_factory=lambda names: 
self._create_prescan_reader(names))
+
+        return reader
+
+    def _create_raw_reader(self) -> RecordReader:
+        """Core read logic: split_by_row_id -> suppliers -> ConcatBatchReader 
-> filter."""
         files = self.split.files
         suppliers = []
 
@@ -830,15 +837,39 @@ class DataEvolutionSplitRead(SplitRead):
         else:
             reader = merge_reader
 
-        if (not CoreOptions.blob_as_descriptor(self.table.options)
-                and CoreOptions.blob_descriptor_fields(self.table.options)):
-            reader = BlobDescriptorConvertReader(reader, self.table)
-
         if self.limit is not None:
             reader = LimitedRecordBatchReader(reader, self.limit)
 
         return reader
 
+    def _create_prescan_reader(self, field_names):
+        """Create a prescan reader by constructing a new DataEvolutionSplitRead
+        instance that only projects the specified field names.
+        
+        Align with Java's configureBlobViewPrescanRead: pass limit to prescan 
reader
+        to avoid scanning entire split when there's a LIMIT clause.
+        """
+        from pypaimon.read.reader.iface.record_batch_reader import 
EmptyRecordBatchReader
+
+        prescan_fields = [f for f in self.read_fields if f.name in field_names]
+        if not prescan_fields:
+            return EmptyRecordBatchReader()
+
+        # When there's a normal field predicate, don't push down limit to 
prescan reader
+        # because the outer reader will apply predicate+limit filtering,
+        # while prescan reader would only apply limit without normal field 
predicate
+        # TODO support limit+predicate push down
+        prescan_read = DataEvolutionSplitRead(
+            table=self.table,
+            predicate=self.predicate,
+            read_type=prescan_fields,
+            split=self.split,
+            row_tracking_enabled=False,
+            limit=None if self.predicate else self.limit,
+        )
+        prescan_read.row_ranges = self.row_ranges
+        return prescan_read._create_raw_reader()
+
     def _split_by_row_id(self, files: List[DataFileMeta]) -> 
List[List[DataFileMeta]]:
         """Split files by firstRowId for data evolution."""
 
diff --git a/paimon-python/pypaimon/schema/schema.py 
b/paimon-python/pypaimon/schema/schema.py
index 9129667326..f3a63c88e1 100644
--- a/paimon-python/pypaimon/schema/schema.py
+++ b/paimon-python/pypaimon/schema/schema.py
@@ -62,40 +62,6 @@ class Schema:
                 if field.name in pk_set:
                     field.type.nullable = False
 
-        # Check if Blob type exists in the schema
-        blob_names = [
-            field.name for field in fields
-            if 'blob' in str(field.type).lower()
-        ]
-
-        if blob_names:
-            if options is None:
-                options = {}
-
-            if len(fields) <= len(blob_names):
-                raise ValueError(
-                    "Table with BLOB type column must have other normal 
columns."
-                )
-
-            required_options = {
-                CoreOptions.ROW_TRACKING_ENABLED.key(): 'true',
-                CoreOptions.DATA_EVOLUTION_ENABLED.key(): 'true'
-            }
-
-            missing_options = []
-            for key, expected_value in required_options.items():
-                if key not in options or options[key] != expected_value:
-                    missing_options.append(f"{key}='{expected_value}'")
-
-            if missing_options:
-                raise ValueError(
-                    f"Schema contains Blob type but is missing required 
options: {', '.join(missing_options)}. "
-                    f"Please add these options to the schema."
-                )
-
-            if primary_keys is not None:
-                raise ValueError("Blob type is not supported with primary 
key.")
-
         # Check if Vector type with dedicated file format
         vector_names = [
             field.name for field in fields
diff --git a/paimon-python/pypaimon/schema/schema_manager.py 
b/paimon-python/pypaimon/schema/schema_manager.py
index 645d2f4328..d01549c71b 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -53,7 +53,7 @@ def _get_rename_mappings(changes: List[SchemaChange]) -> dict:
 
 
 def _handle_update_column_comment(
-        change: UpdateColumnComment, new_fields: List[DataField]
+    change: UpdateColumnComment, new_fields: List[DataField]
 ):
     field_name = change.field_names[-1]
     field_index = _find_field_index(new_fields, field_name)
@@ -66,7 +66,7 @@ def _handle_update_column_comment(
 
 
 def _handle_update_column_nullability(
-        change: UpdateColumnNullability, new_fields: List[DataField]
+    change: UpdateColumnNullability, new_fields: List[DataField]
 ):
     field_name = change.field_names[-1]
     field_index = _find_field_index(new_fields, field_name)
@@ -83,7 +83,7 @@ def _handle_update_column_nullability(
 
 
 def _handle_update_column_type(
-        change: UpdateColumnType, new_fields: List[DataField]
+    change: UpdateColumnType, new_fields: List[DataField]
 ):
     field_name = change.field_names[-1]
     field_index = _find_field_index(new_fields, field_name)
@@ -166,6 +166,71 @@ def _assert_not_renaming_blob_column(
             )
 
 
+def _validate_blob_fields(fields: List[DataField], options: dict, 
primary_keys: List[str]):
+    """Validate blob field configurations in the schema."""
+    if options is None:
+        options = {}
+
+    blob_field_names = {
+        field.name for field in fields
+        if getattr(field.type, 'type', None) == 'BLOB'
+    }
+
+    if len(fields) <= len(blob_field_names):
+        raise ValueError(
+            "Table with BLOB type column must have other normal columns."
+        )
+
+    core_options = CoreOptions(Options(options))
+
+    configured_blob_fields = core_options.blob_field()
+    for field in configured_blob_fields:
+        if field not in blob_field_names:
+            raise ValueError(
+                "Field '{}' in '{}' must be a BLOB field in table 
schema.".format(
+                    field, CoreOptions.BLOB_FIELD.key()
+                )
+            )
+
+    descriptor_fields = core_options.blob_descriptor_fields()
+    view_fields = core_options.blob_view_fields()
+
+    all_inline_fields = descriptor_fields.union(view_fields)
+    non_blob_inline_fields = all_inline_fields.difference(blob_field_names)
+    if non_blob_inline_fields:
+        raise ValueError(
+            "Fields in 'blob-descriptor-field' or 'blob-view-field' must be 
blob fields "
+            "in schema. Non-BLOB fields: 
{}".format(sorted(non_blob_inline_fields))
+        )
+
+    overlapping_inline_fields = descriptor_fields.intersection(view_fields)
+    if overlapping_inline_fields:
+        raise ValueError(
+            "Fields in 'blob-descriptor-field' and 'blob-view-field' must not 
overlap. "
+            "Overlapping fields: {}".format(sorted(overlapping_inline_fields))
+        )
+
+    if blob_field_names:
+        required_options = {
+            CoreOptions.ROW_TRACKING_ENABLED.key(): 'true',
+            CoreOptions.DATA_EVOLUTION_ENABLED.key(): 'true'
+        }
+
+        missing_options = []
+        for key, expected_value in required_options.items():
+            if key not in options or options[key] != expected_value:
+                missing_options.append(f"{key}='{expected_value}'")
+
+        if missing_options:
+            raise ValueError(
+                f"Schema contains Blob type but is missing required options: 
{', '.join(missing_options)}. "
+                f"Please add these options to the schema."
+            )
+
+        if primary_keys:
+            raise ValueError("Blob type is not supported with primary key.")
+
+
 def _validate_blob_external_storage_fields(fields: List[DataField], options: 
dict):
     """Validate blob-external-storage-field configuration.
 
@@ -364,6 +429,7 @@ class SchemaManager:
                 comment=schema.comment,
             )
 
+            _validate_blob_fields(schema.fields, schema.options, 
schema.primary_keys)
             _validate_blob_external_storage_fields(schema.fields, 
schema.options)
             table_schema = TableSchema.from_schema(schema_id=0, schema=schema)
             success = self.commit(table_schema)
@@ -371,6 +437,7 @@ class SchemaManager:
                 return table_schema
 
     def commit(self, new_schema: TableSchema) -> bool:
+        _validate_blob_fields(new_schema.fields, new_schema.options, 
new_schema.primary_keys)
         schema_path = self._to_schema_path(new_schema.id)
         try:
             result = self.file_io.try_to_write_atomic(schema_path, 
JSON.to_json(new_schema, indent=2))
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 1fee68615a..9fd61abed4 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -117,9 +117,8 @@ class FileStoreTable(Table):
         """Get the branch manager for this table."""
         # If catalog environment has a catalog loader, use CatalogBranchManager
         catalog_loader = self.catalog_environment.catalog_loader
-        if catalog_loader is not None:
-            from pypaimon.branch.catalog_branch_manager import \
-                CatalogBranchManager
+        if catalog_loader is not None and 
self.catalog_environment.supports_version_management:
+            from pypaimon.branch.catalog_branch_manager import 
CatalogBranchManager
             return CatalogBranchManager(
                 catalog_loader,
                 self.identifier
diff --git a/paimon-python/pypaimon/table/row/blob.py 
b/paimon-python/pypaimon/table/row/blob.py
index 056316d55f..eb2f00b764 100644
--- a/paimon-python/pypaimon/table/row/blob.py
+++ b/paimon-python/pypaimon/table/row/blob.py
@@ -21,6 +21,7 @@ from abc import ABC, abstractmethod
 from typing import BinaryIO, Callable, Optional, Union
 from urllib.parse import urlparse
 
+from pypaimon.common.identifier import Identifier
 from pypaimon.common.uri_reader import UriReader, FileUriReader
 
 
@@ -162,6 +163,115 @@ class BlobDescriptor:
         return self.__str__()
 
 
+class BlobViewStruct:
+    CURRENT_VERSION = 1
+    MAGIC = 0x424C4F4256494557  # "BLOBVIEW"
+
+    def __init__(self, identifier: Union[Identifier, str], field_id: int, 
row_id: int):
+        if isinstance(identifier, str):
+            identifier = Identifier.from_string(identifier)
+        if not isinstance(identifier, Identifier):
+            raise TypeError("BlobViewStruct identifier must be Identifier or 
str.")
+        self._identifier = identifier
+        self._field_id = field_id
+        self._row_id = row_id
+
+    @property
+    def identifier(self) -> Identifier:
+        return self._identifier
+
+    @property
+    def field_id(self) -> int:
+        return self._field_id
+
+    @property
+    def row_id(self) -> int:
+        return self._row_id
+
+    def serialize(self) -> bytes:
+        identifier_bytes = self._identifier.get_full_name().encode('utf-8')
+        data = struct.pack('<B', self.CURRENT_VERSION)
+        data += struct.pack('<Q', self.MAGIC)
+        data += struct.pack('<I', len(identifier_bytes))
+        data += identifier_bytes
+        data += struct.pack('<i', self._field_id)
+        data += struct.pack('<q', self._row_id)
+        return data
+
+    @classmethod
+    def deserialize(cls, data: bytes) -> 'BlobViewStruct':
+        if len(data) < 25:
+            raise ValueError("Invalid BlobViewStruct data: too short")
+
+        offset = 0
+        version = struct.unpack('<B', data[offset:offset + 1])[0]
+        offset += 1
+        if version != cls.CURRENT_VERSION:
+            raise ValueError(
+                f"Expecting BlobViewStruct version to be 
{cls.CURRENT_VERSION}, "
+                f"but found {version}."
+            )
+
+        magic = struct.unpack('<Q', data[offset:offset + 8])[0]
+        offset += 8
+        if magic != cls.MAGIC:
+            raise ValueError(
+                f"Invalid BlobViewStruct: missing magic header. Expected 
magic: "
+                f"{cls.MAGIC}, but found: {magic}"
+            )
+
+        identifier_length = struct.unpack('<I', data[offset:offset + 4])[0]
+        offset += 4
+        if offset + identifier_length + 12 > len(data):
+            raise ValueError("Invalid BlobViewStruct data: identifier length 
exceeds data size")
+
+        identifier = data[offset:offset + identifier_length].decode('utf-8')
+        offset += identifier_length
+        field_id = struct.unpack('<i', data[offset:offset + 4])[0]
+        offset += 4
+        row_id = struct.unpack('<q', data[offset:offset + 8])[0]
+        offset += 8
+        if offset != len(data):
+            raise ValueError("Invalid BlobViewStruct data: trailing bytes")
+
+        return cls(Identifier.from_string(identifier), field_id, row_id)
+
+    @classmethod
+    def is_blob_view_struct(cls, data: bytes) -> bool:
+        if not isinstance(data, (bytes, bytearray)):
+            return False
+        raw = bytes(data)
+        if len(raw) < 9:
+            return False
+        version = raw[0]
+        if version != cls.CURRENT_VERSION:
+            return False
+        try:
+            magic = struct.unpack('<Q', raw[1:9])[0]
+            return magic == cls.MAGIC
+        except Exception:
+            return False
+
+    def __eq__(self, other) -> bool:
+        if not isinstance(other, BlobViewStruct):
+            return False
+        return (self._identifier == other._identifier
+                and self._field_id == other._field_id
+                and self._row_id == other._row_id)
+
+    def __hash__(self) -> int:
+        return hash((self._identifier.get_full_name(), self._field_id, 
self._row_id))
+
+    def __str__(self) -> str:
+        return (
+            f"BlobViewStruct(identifier={self._identifier.get_full_name()}, "
+            f"field_id={self._field_id}, row_id={self._row_id})"
+        )
+
+    def __repr__(self) -> str:
+        return self.__str__()
+
+
 class OffsetInputStream(io.RawIOBase):
 
     def __init__(self, wrapped, offset: int, length: int):
@@ -276,6 +386,10 @@ class Blob(ABC):
     def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) -> 
'Blob':
         return BlobRef(uri_reader, descriptor)
 
+    @staticmethod
+    def from_view(view_struct: BlobViewStruct) -> 'BlobView':
+        return BlobView(view_struct)
+
     @staticmethod
     def from_bytes(data: Optional[bytes], file_io=None, allow_blob_data: bool 
= True) -> Optional['Blob']:
         if data is None:
@@ -283,6 +397,8 @@ class Blob(ABC):
         if not isinstance(data, (bytes, bytearray)):
             raise TypeError(f"Blob.from_bytes expects bytes, got {type(data)}")
         data = bytes(data)
+        if BlobViewStruct.is_blob_view_struct(data):
+            return Blob.from_view(BlobViewStruct.deserialize(data))
         is_descriptor = BlobDescriptor.is_blob_descriptor(data)
         if not allow_blob_data and not is_descriptor:
             raise ValueError(
@@ -385,3 +501,42 @@ class BlobRef(Blob):
 
 
 BlobConsumer = Callable[[str, Optional[BlobDescriptor]], bool]
+
+
+class BlobView(Blob):
+
+    def __init__(self, view_struct: BlobViewStruct):
+        self._view_struct: BlobViewStruct = view_struct
+        self._resolved_blob: Optional[BlobRef] = None
+
+    @property
+    def view_struct(self) -> BlobViewStruct:
+        return self._view_struct
+
+    def is_resolved(self) -> bool:
+        return self._resolved_blob is not None
+
+    def resolve(self, uri_reader: UriReader, descriptor: BlobDescriptor):
+        self._resolved_blob = BlobRef(uri_reader, descriptor)
+
+    def to_data(self) -> bytes:
+        return self._resolved().to_data()
+
+    def to_descriptor(self) -> BlobDescriptor:
+        return self._resolved().to_descriptor()
+
+    def new_input_stream(self) -> BinaryIO:
+        return self._resolved().new_input_stream()
+
+    def _resolved(self) -> BlobRef:
+        if self._resolved_blob is None:
+            raise RuntimeError("BlobView is not resolved.")
+        return self._resolved_blob
+
+    def __eq__(self, other) -> bool:
+        if not isinstance(other, BlobView):
+            return False
+        return self._view_struct == other._view_struct
+
+    def __hash__(self) -> int:
+        return hash(self._view_struct)
diff --git a/paimon-python/pypaimon/table/special_fields.py 
b/paimon-python/pypaimon/table/special_fields.py
index 5c578ec85f..64d2429bef 100644
--- a/paimon-python/pypaimon/table/special_fields.py
+++ b/paimon-python/pypaimon/table/special_fields.py
@@ -81,3 +81,22 @@ class SpecialFields:
             fields_with_row_tracking.append(SpecialFields.SEQUENCE_NUMBER)
 
         return fields_with_row_tracking
+
+    @staticmethod
+    def row_type_with_row_id(table_fields: List[DataField]) -> List[DataField]:
+        """Add ROW_ID field to the given fields list.
+
+        Args:
+            table_fields: The original table fields
+        """
+        fields_with_row_id = list(table_fields)
+
+        for field in fields_with_row_id:
+            if SpecialFields.ROW_ID.name == field.name:
+                raise ValueError(
+                    "Row tracking field name '{}' conflicts with existing 
field names."
+                    .format(field.name)
+                )
+
+        fields_with_row_id.append(SpecialFields.ROW_ID)
+        return fields_with_row_id
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index b6a12fe973..3d33594c4a 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1390,6 +1390,483 @@ class DedicatedFormatWriterTest(unittest.TestCase):
         self.assertEqual(result.column('pic1').to_pylist()[0], pic1_data)
         self.assertEqual(result.column('pic2').to_pylist()[0], pic2_data)
 
+    def test_blob_view_fields_resolve_upstream_blob(self):
+        from pypaimon import Schema
+        from pypaimon.common.options.core_options import CoreOptions
+        from pypaimon.table.row.blob import BlobViewStruct
+
+        source_schema = pa.schema([
+            ('id', pa.int32()),
+            ('picture', pa.large_binary()),
+        ])
+        source = Schema.from_pyarrow_schema(
+            source_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+            }
+        )
+        self.catalog.create_table('test_db.blob_view_source', source, False)
+        source_table = self.catalog.get_table('test_db.blob_view_source')
+        payloads = [b'view-source-0', b'view-source-1']
+
+        write_builder = source_table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(pa.Table.from_pydict({
+            'id': [1, 2],
+            'picture': payloads,
+        }, schema=source_schema))
+        commit_messages = writer.prepare_commit()
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        picture_field_id = next(
+            field.id for field in source_table.table_schema.fields if 
field.name == 'picture'
+        )
+        view_values = [
+            BlobViewStruct('test_db.blob_view_source', picture_field_id, 
0).serialize(),
+            BlobViewStruct('test_db.blob_view_source', picture_field_id, 
1).serialize(),
+        ]
+
+        target_schema = pa.schema([
+            ('id', pa.int32()),
+            ('picture', pa.large_binary()),
+        ])
+        target = Schema.from_pyarrow_schema(
+            target_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob-view-field': 'picture',
+            }
+        )
+        self.catalog.create_table('test_db.blob_view_target', target, False)
+        target_table = self.catalog.get_table('test_db.blob_view_target')
+
+        target_write_builder = target_table.new_batch_write_builder()
+        target_writer = target_write_builder.new_write()
+        target_writer.write_arrow(pa.Table.from_pydict({
+            'id': [10, 11],
+            'picture': view_values,
+        }, schema=target_schema))
+        target_commit_messages = target_writer.prepare_commit()
+        target_write_builder.new_commit().commit(target_commit_messages)
+        target_writer.close()
+
+        all_target_files = [f for msg in target_commit_messages for f in 
msg.new_files]
+        self.assertFalse(
+            any(f.file_name.endswith('.blob') for f in all_target_files),
+            "Blob view fields should be stored inline without writing new blob 
files",
+        )
+
+        result = target_table.new_read_builder().new_read().to_arrow(
+            target_table.new_read_builder().new_scan().plan().splits()
+        ).sort_by('id')
+        self.assertEqual(result.column('picture').to_pylist(), payloads)
+
+        descriptor_table = 
target_table.copy({CoreOptions.BLOB_AS_DESCRIPTOR.key(): 'true'})
+        descriptor_result = 
descriptor_table.new_read_builder().new_read().to_arrow(
+            descriptor_table.new_read_builder().new_scan().plan().splits()
+        ).sort_by('id')
+        # With blob-as-descriptor=true, view fields return BlobDescriptor bytes
+        from pypaimon.table.row.blob import BlobDescriptor
+        for value in descriptor_result.column('picture').to_pylist():
+            self.assertTrue(
+                BlobDescriptor.is_blob_descriptor(value),
+                "Expected BlobDescriptor bytes when blob-as-descriptor=true"
+            )
+
+    def test_blob_view_resolve_disabled_preserves_references(self):
+        from pypaimon import Schema
+        from pypaimon.common.options.core_options import CoreOptions
+        from pypaimon.table.row.blob import BlobViewStruct
+
+        source_schema = pa.schema([
+            ('id', pa.int32()),
+            ('picture', pa.large_binary()),
+        ])
+        source = Schema.from_pyarrow_schema(
+            source_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+            }
+        )
+        self.catalog.create_table('test_db.blob_view_resolve_source', source, 
False)
+        source_table = 
self.catalog.get_table('test_db.blob_view_resolve_source')
+        payloads = [b'resolve-source-0', b'resolve-source-1']
+
+        write_builder = source_table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(pa.Table.from_pydict({
+            'id': [1, 2],
+            'picture': payloads,
+        }, schema=source_schema))
+        commit_messages = writer.prepare_commit()
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        picture_field_id = next(
+            field.id for field in source_table.table_schema.fields if 
field.name == 'picture'
+        )
+        view_values = [
+            BlobViewStruct('test_db.blob_view_resolve_source', 
picture_field_id, 0).serialize(),
+            BlobViewStruct('test_db.blob_view_resolve_source', 
picture_field_id, 1).serialize(),
+        ]
+
+        target_schema = pa.schema([
+            ('id', pa.int32()),
+            ('picture', pa.large_binary()),
+        ])
+        target = Schema.from_pyarrow_schema(
+            target_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob-view-field': 'picture',
+            }
+        )
+        self.catalog.create_table('test_db.blob_view_resolve_target', target, 
False)
+        target_table = 
self.catalog.get_table('test_db.blob_view_resolve_target')
+
+        target_write_builder = target_table.new_batch_write_builder()
+        target_writer = target_write_builder.new_write()
+        target_writer.write_arrow(pa.Table.from_pydict({
+            'id': [10, 11],
+            'picture': view_values,
+        }, schema=target_schema))
+        target_commit_messages = target_writer.prepare_commit()
+        target_write_builder.new_commit().commit(target_commit_messages)
+        target_writer.close()
+
+        # Default (resolve enabled): view fields are resolved to real blob 
data.
+        resolved_result = target_table.new_read_builder().new_read().to_arrow(
+            target_table.new_read_builder().new_scan().plan().splits()
+        ).sort_by('id')
+        self.assertEqual(resolved_result.column('picture').to_pylist(), 
payloads)
+
+        # resolve disabled: view fields keep the original BlobViewStruct bytes.
+        preserve_table = target_table.copy(
+            {CoreOptions.BLOB_VIEW_RESOLVE_ENABLED.key(): 'false'}
+        )
+        preserve_result = 
preserve_table.new_read_builder().new_read().to_arrow(
+            preserve_table.new_read_builder().new_scan().plan().splits()
+        ).sort_by('id')
+        preserved_values = preserve_result.column('picture').to_pylist()
+        self.assertEqual(preserved_values, view_values)
+        for value in preserved_values:
+            self.assertTrue(
+                BlobViewStruct.is_blob_view_struct(value),
+                "Expected original BlobViewStruct bytes when resolve disabled"
+            )
+
+    def test_blob_view_resolves_null_upstream_value(self):
+        from pypaimon import Schema
+        from pypaimon.table.row.blob import BlobViewStruct
+
+        source_schema = pa.schema([
+            ('id', pa.int32()),
+            ('picture', pa.large_binary()),
+        ])
+        source = Schema.from_pyarrow_schema(
+            source_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+            }
+        )
+        self.catalog.create_table('test_db.blob_view_null_source', source, 
False)
+        source_table = self.catalog.get_table('test_db.blob_view_null_source')
+        # Row 0 has a real blob value, row 1 has a null blob value.
+        payloads = [b'null-source-0', None]
+
+        write_builder = source_table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(pa.Table.from_pydict({
+            'id': [1, 2],
+            'picture': payloads,
+        }, schema=source_schema))
+        commit_messages = writer.prepare_commit()
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        picture_field_id = next(
+            field.id for field in source_table.table_schema.fields if 
field.name == 'picture'
+        )
+        view_values = [
+            BlobViewStruct('test_db.blob_view_null_source', picture_field_id, 
0).serialize(),
+            BlobViewStruct('test_db.blob_view_null_source', picture_field_id, 
1).serialize(),
+        ]
+
+        target_schema = pa.schema([
+            ('id', pa.int32()),
+            ('picture', pa.large_binary()),
+        ])
+        target = Schema.from_pyarrow_schema(
+            target_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob-view-field': 'picture',
+            }
+        )
+        self.catalog.create_table('test_db.blob_view_null_target', target, 
False)
+        target_table = self.catalog.get_table('test_db.blob_view_null_target')
+
+        target_write_builder = target_table.new_batch_write_builder()
+        target_writer = target_write_builder.new_write()
+        target_writer.write_arrow(pa.Table.from_pydict({
+            'id': [10, 11],
+            'picture': view_values,
+        }, schema=target_schema))
+        target_commit_messages = target_writer.prepare_commit()
+        target_write_builder.new_commit().commit(target_commit_messages)
+        target_writer.close()
+
+        # View referencing a real upstream value resolves to data; view
+        # referencing a null upstream value resolves to None (not an error).
+        result = target_table.new_read_builder().new_read().to_arrow(
+            target_table.new_read_builder().new_scan().plan().splits()
+        ).sort_by('id')
+        self.assertEqual(result.column('picture').to_pylist(), 
[b'null-source-0', None])
+
+    def test_blob_view_fields_rejects_non_view_input(self):
+        from pypaimon import Schema
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('picture', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob-view-field': 'picture',
+            }
+        )
+        self.catalog.create_table('test_db.blob_view_reject_test', schema, 
False)
+        table = self.catalog.get_table('test_db.blob_view_reject_test')
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        bad_data = pa.Table.from_pydict({
+            'id': [1],
+            'picture': [b'not-a-view-struct'],
+        }, schema=pa_schema)
+
+        with self.assertRaises(ValueError) as context:
+            writer.write_arrow(bad_data)
+        self.assertIn("blob-view-field", str(context.exception))
+
+    def test_blob_inline_fields_reject_overlap_and_unknown_fields(self):
+        from pypaimon import Schema
+
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('picture', pa.large_binary()),
+        ])
+        base_options = {
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        }
+
+        overlap_options = dict(base_options)
+        overlap_options.update({
+            'blob-descriptor-field': 'picture',
+            'blob-view-field': 'picture',
+        })
+        overlap_schema = Schema.from_pyarrow_schema(pa_schema, 
options=overlap_options)
+        with self.assertRaises(ValueError) as overlap_context:
+            self.catalog.create_table(
+                'test_db.blob_overlap_reject', overlap_schema, False)
+        self.assertIn("must not overlap", str(overlap_context.exception))
+
+        unknown_options = dict(base_options)
+        unknown_options.update({'blob-view-field': 'missing_picture'})
+        unknown_schema = Schema.from_pyarrow_schema(pa_schema, 
options=unknown_options)
+        with self.assertRaises(ValueError) as unknown_context:
+            self.catalog.create_table(
+                'test_db.blob_unknown_reject', unknown_schema, False)
+        self.assertIn("must be blob fields", str(unknown_context.exception))
+
+    def test_blob_view_prescan_with_limit(self):
+        """Test that limit is correctly pushed down to prescan reader.
+        
+        Regression test for: prescan should only scan up to limit rows,
+        not the entire split.
+        """
+        from pypaimon import Schema
+        from pypaimon.table.row.blob import BlobViewStruct
+
+        # Create source table with multiple rows
+        source_schema = pa.schema([
+            ('id', pa.int32()),
+            ('picture', pa.large_binary()),
+        ])
+        source = Schema.from_pyarrow_schema(
+            source_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+            }
+        )
+        self.catalog.create_table('test_db.blob_view_limit_source', source, 
False)
+        source_table = self.catalog.get_table('test_db.blob_view_limit_source')
+
+        # Write 10 rows
+        num_rows = 10
+        payloads = [f'payload-{i}'.encode() for i in range(num_rows)]
+        write_builder = source_table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(pa.Table.from_pydict({
+            'id': list(range(num_rows)),
+            'picture': payloads,
+        }, schema=source_schema))
+        commit_messages = writer.prepare_commit()
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        picture_field_id = next(
+            field.id for field in source_table.table_schema.fields if 
field.name == 'picture'
+        )
+        view_values = [
+            BlobViewStruct('test_db.blob_view_limit_source', picture_field_id, 
i).serialize()
+            for i in range(num_rows)
+        ]
+
+        # Create target table with blob-view-field
+        target_schema = pa.schema([
+            ('id', pa.int32()),
+            ('picture', pa.large_binary()),
+        ])
+        target = Schema.from_pyarrow_schema(
+            target_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob-view-field': 'picture',
+            }
+        )
+        self.catalog.create_table('test_db.blob_view_limit_target', target, 
False)
+        target_table = self.catalog.get_table('test_db.blob_view_limit_target')
+
+        target_write_builder = target_table.new_batch_write_builder()
+        target_writer = target_write_builder.new_write()
+        target_writer.write_arrow(pa.Table.from_pydict({
+            'id': list(range(num_rows)),
+            'picture': view_values,
+        }, schema=target_schema))
+        target_commit_messages = target_writer.prepare_commit()
+        target_write_builder.new_commit().commit(target_commit_messages)
+        target_writer.close()
+
+        # Test with limit: should only return first 3 rows
+        read_builder = target_table.new_read_builder()
+        read_builder.with_limit(3)
+        result = read_builder.new_read().to_arrow(
+            read_builder.new_scan().plan().splits()
+        )
+        self.assertEqual(result.num_rows, 3, "LIMIT should be respected in 
blob view prescan")
+        self.assertEqual(result.column('id').to_pylist(), [0, 1, 2])
+
+    def test_blob_view_prescan_only_collects_limited_view_structs(self):
+        """Verify that the prescan stage only collects as many BlobViewStructs 
as
+        the limit allows, instead of scanning the entire split.
+
+        Unlike test_blob_view_prescan_with_limit (which only checks the final
+        output), this test patches BlobViewLookup.preload to capture the exact
+        list of view structs collected during prescan and asserts its length
+        equals the limit.
+        """
+        from unittest import mock
+
+        from pypaimon import Schema
+        from pypaimon.table.row.blob import BlobViewStruct
+        from pypaimon.utils.blob_view_lookup import BlobViewLookup
+
+        source_schema = pa.schema([
+            ('id', pa.int32()),
+            ('picture', pa.large_binary()),
+        ])
+        source = Schema.from_pyarrow_schema(
+            source_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+            }
+        )
+        self.catalog.create_table('test_db.blob_view_prescan_count_source', 
source, False)
+        source_table = 
self.catalog.get_table('test_db.blob_view_prescan_count_source')
+
+        num_rows = 10
+        payloads = [f'payload-{i}'.encode() for i in range(num_rows)]
+        write_builder = source_table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(pa.Table.from_pydict({
+            'id': list(range(num_rows)),
+            'picture': payloads,
+        }, schema=source_schema))
+        commit_messages = writer.prepare_commit()
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        picture_field_id = next(
+            field.id for field in source_table.table_schema.fields if 
field.name == 'picture'
+        )
+        view_values = [
+            BlobViewStruct('test_db.blob_view_prescan_count_source', 
picture_field_id, i).serialize()
+            for i in range(num_rows)
+        ]
+
+        target_schema = pa.schema([
+            ('id', pa.int32()),
+            ('picture', pa.large_binary()),
+        ])
+        target = Schema.from_pyarrow_schema(
+            target_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob-view-field': 'picture',
+            }
+        )
+        self.catalog.create_table('test_db.blob_view_prescan_count_target', 
target, False)
+        target_table = 
self.catalog.get_table('test_db.blob_view_prescan_count_target')
+
+        target_write_builder = target_table.new_batch_write_builder()
+        target_writer = target_write_builder.new_write()
+        target_writer.write_arrow(pa.Table.from_pydict({
+            'id': list(range(num_rows)),
+            'picture': view_values,
+        }, schema=target_schema))
+        target_commit_messages = target_writer.prepare_commit()
+        target_write_builder.new_commit().commit(target_commit_messages)
+        target_writer.close()
+
+        captured_view_structs = []
+        original_preload = BlobViewLookup.preload
+
+        def capturing_preload(lookup_self, view_structs):
+            captured_view_structs.append(list(view_structs))
+            return original_preload(lookup_self, view_structs)
+
+        limit = 3
+        read_builder = target_table.new_read_builder()
+        read_builder.with_limit(limit)
+        with mock.patch.object(BlobViewLookup, 'preload', autospec=True,
+                               side_effect=capturing_preload):
+            result = read_builder.new_read().to_arrow(
+                read_builder.new_scan().plan().splits()
+            )
+
+        self.assertEqual(result.num_rows, limit)
+        self.assertEqual(len(captured_view_structs), 1,
+                         "preload should be invoked exactly once during 
prescan")
+        self.assertEqual(
+            len(captured_view_structs[0]), limit,
+            "prescan should only collect as many view structs as the limit 
allows")
+
     def test_to_arrow_batch_reader(self):
         import random
         from pypaimon import Schema
@@ -3204,7 +3681,7 @@ class DedicatedFormatWriterTest(unittest.TestCase):
         total_split_row_count = sum([s.row_count for s in splits])
         self.assertEqual(total_split_row_count, num_rows * 2,
                          f"Total split row count should be {num_rows}, got 
{total_split_row_count}")
-        
+
         total_merged_count = 0
         for split in splits:
             merged_count = split.merged_row_count()
@@ -3213,7 +3690,7 @@ class DedicatedFormatWriterTest(unittest.TestCase):
                 self.assertLessEqual(
                     merged_count, split.row_count,
                     f"merged_row_count ({merged_count}) should be <= row_count 
({split.row_count})")
-        
+
         if total_merged_count > 0:
             self.assertEqual(
                 total_merged_count, num_rows,
@@ -3312,6 +3789,24 @@ class DedicatedFormatWriterTest(unittest.TestCase):
             )
         self.assertIn('Cannot rename BLOB column', str(ctx.exception))
 
+    def test_nested_field_named_blob_not_treated_as_blob(self):
+        """Regression: a ROW field with a nested column whose name contains
+        'blob' must NOT be treated as a top-level BLOB column.  Previously
+        the substring match would falsely classify such fields, causing
+        create_table to require row-tracking and data-evolution options."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('payload', pa.struct([
+                ('blob_name', pa.string()),
+                ('value', pa.int64()),
+            ])),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema)
+        self.catalog.create_table(
+            'test_db.nested_blob_name_no_error', schema, False)
+        table = self.catalog.get_table('test_db.nested_blob_name_no_error')
+        self.assertIsNotNone(table)
+
 
 class GetBlobTest(unittest.TestCase):
 
diff --git a/paimon-python/pypaimon/tests/blob_test.py 
b/paimon-python/pypaimon/tests/blob_test.py
index b91ffdaf43..37217f8b7c 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -31,7 +31,7 @@ from pypaimon.filesystem.local_file_io import LocalFileIO
 from pypaimon.common.options import Options
 from pypaimon.read.reader.format_blob_reader import BlobRecordIterator, 
FormatBlobReader
 from pypaimon.schema.data_types import AtomicType, DataField
-from pypaimon.table.row.blob import Blob, BlobData, BlobRef, BlobDescriptor
+from pypaimon.table.row.blob import Blob, BlobData, BlobRef, BlobDescriptor, 
BlobViewStruct, BlobView
 from pypaimon.table.row.generic_row import GenericRowDeserializer, 
GenericRowSerializer, GenericRow
 from pypaimon.table.row.row_kind import RowKind
 
@@ -166,6 +166,25 @@ class BlobTest(unittest.TestCase):
         with self.assertRaises(TypeError):
             Blob.from_bytes(12345)
 
+    def test_blob_view_struct_roundtrip(self):
+        """Test BlobViewStruct serialization compatibility."""
+        view_struct = BlobViewStruct("test_db.source_table", 7, 42)
+        serialized = view_struct.serialize()
+
+        self.assertTrue(BlobViewStruct.is_blob_view_struct(serialized))
+        self.assertFalse(BlobDescriptor.is_blob_descriptor(serialized))
+
+        restored = BlobViewStruct.deserialize(serialized)
+        self.assertEqual(restored, view_struct)
+        self.assertEqual(restored.identifier.get_full_name(), 
"test_db.source_table")
+        self.assertEqual(restored.field_id, 7)
+        self.assertEqual(restored.row_id, 42)
+
+        blob = Blob.from_bytes(view_struct.serialize())
+        self.assertIsInstance(blob, BlobView)
+        self.assertFalse(blob.is_resolved())
+        self.assertEqual(blob.view_struct, view_struct)
+
     def test_blob_data_interface_compliance(self):
         """Test that BlobData properly implements Blob interface."""
         test_data = b"interface test data"
diff --git a/paimon-python/pypaimon/tests/external_storage_blob_test.py 
b/paimon-python/pypaimon/tests/external_storage_blob_test.py
index 505e4407a9..e7ad8273d9 100644
--- a/paimon-python/pypaimon/tests/external_storage_blob_test.py
+++ b/paimon-python/pypaimon/tests/external_storage_blob_test.py
@@ -94,7 +94,7 @@ class ExternalStorageBlobValidationTest(unittest.TestCase):
         })
         with self.assertRaises(ValueError) as ctx:
             self.catalog.create_table('test_db.not_blob_type_test', schema, 
False)
-        self.assertIn('must be a BLOB type field', str(ctx.exception))
+        self.assertIn('must be blob fields', str(ctx.exception))
 
     def test_validation_blob_not_null_field_passes(self):
         """BLOB NOT NULL fields should pass validation (not be rejected by str 
comparison)."""
diff --git a/paimon-python/pypaimon/utils/blob_view_lookup.py 
b/paimon-python/pypaimon/utils/blob_view_lookup.py
new file mode 100644
index 0000000000..1acff6bcc8
--- /dev/null
+++ b/paimon-python/pypaimon/utils/blob_view_lookup.py
@@ -0,0 +1,274 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from typing import Dict, List, Tuple, Set
+
+from pypaimon.common.identifier import Identifier
+from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.table.row.blob import BlobDescriptor, BlobViewStruct
+from pypaimon.table.special_fields import SpecialFields
+from pypaimon.utils.range import Range
+
+_PRELOAD_THREAD_NUM = 100
+_MIN_ROWS_PER_TASK = 100
+
+
+class TableReferences:
+    """Groups BlobViewStruct references by upstream table."""
+
+    def __init__(self, identifier: Identifier):
+        self.identifier: Identifier = identifier
+        self.references_by_field: Dict[int, List[BlobViewStruct]] = {}
+        self.row_ids: List[int] = []
+
+    def add(self, view_struct: BlobViewStruct) -> None:
+        self.references_by_field.setdefault(view_struct.field_id, 
[]).append(view_struct)
+        self.row_ids.append(int(view_struct.row_id))
+
+
+class TableReadPlan:
+    """A plan for reading blob descriptors from one upstream table."""
+
+    def __init__(self, identifier: Identifier, upstream_table,
+                 read_fields: List, row_ranges: List[Range]):
+        self.identifier: Identifier = identifier
+        self.upstream_table = upstream_table
+        self.read_fields: List = read_fields
+        self.row_ranges: List[Range] = row_ranges
+
+
+class BlobViewLookup:
+    """Resolve BlobViewStruct references by reading upstream blob 
descriptors."""
+
+    def __init__(self, table):
+        self._table = table
+        self._descriptor_cache: Dict[BlobViewStruct, BlobDescriptor] = {}
+        self._null_value_cache: Set[BlobViewStruct] = set()
+
+    def preload(self, view_structs: List[BlobViewStruct]):
+        if not view_structs:
+            return
+
+        grouped: Dict[str, TableReferences] = 
self._group_by_table(view_structs)
+        plans: List[TableReadPlan] = []
+        for table_refs in grouped.values():
+            plans.append(self._create_table_read_plan(table_refs))
+
+        target_rows: int = self._target_rows_per_task(plans)
+        tasks: List[Tuple[TableReadPlan, List[Range]]] = []
+        for plan in plans:
+            for range_chunk in self._split_row_ranges(plan.row_ranges, 
target_rows):
+                tasks.append((plan, range_chunk))
+
+        if len(tasks) <= 1:
+            for plan, range_chunk in tasks:
+                descriptors, null_values = self._load_descriptor_chunk(plan, 
range_chunk)
+                self._descriptor_cache.update(descriptors)
+                self._null_value_cache.update(null_values)
+            return
+
+        with ThreadPoolExecutor(max_workers=min(_PRELOAD_THREAD_NUM, 
len(tasks))) as executor:
+            futures = {
+                executor.submit(self._load_descriptor_chunk, plan, 
range_chunk): (plan, range_chunk)
+                for plan, range_chunk in tasks
+            }
+            for future in as_completed(futures):
+                try:
+                    descriptors, null_values = future.result()
+                    self._descriptor_cache.update(descriptors)
+                    self._null_value_cache.update(null_values)
+                except Exception as exc:
+                    # Cancel remaining futures that have not started yet so a 
single
+                    # failure can abort the rest of the preload work as early 
as possible.
+                    for pending_future in futures:
+                        pending_future.cancel()
+                    raise RuntimeError("Failed to preload blob descriptors.") 
from exc
+
+    def resolve_descriptor(self, view_struct: BlobViewStruct) -> 
BlobDescriptor:
+        descriptor: BlobDescriptor = self._descriptor_cache.get(view_struct)
+        if descriptor is None:
+            if view_struct in self._null_value_cache:
+                raise ValueError(
+                    "BlobViewStruct {} resolves to a null blob 
value.".format(view_struct)
+                )
+            raise ValueError(
+                "Cannot resolve BlobViewStruct {} because row id {} was not 
found "
+                "in upstream table.".format(view_struct, view_struct.row_id)
+            )
+        return descriptor
+
+    def resolve_to_null(self, view_struct: BlobViewStruct) -> bool:
+        if view_struct in self._null_value_cache:
+            return True
+        if view_struct not in self._descriptor_cache:
+            raise ValueError(
+                "Cannot resolve BlobViewStruct {} because row id {} was not 
found "
+                "in upstream table.".format(view_struct, view_struct.row_id)
+            )
+        return False
+
+    def _group_by_table(
+            self, view_structs: List[BlobViewStruct]
+    ) -> Dict[str, TableReferences]:
+        grouped: Dict[str, TableReferences] = {}
+        for view_struct in view_structs:
+            key = view_struct.identifier.get_full_name()
+            if key not in grouped:
+                grouped[key] = TableReferences(view_struct.identifier)
+            grouped[key].add(view_struct)
+        return grouped
+
+    def _create_table_read_plan(self, table_refs: TableReferences) -> 
TableReadPlan:
+        upstream_table = self._load_table(table_refs.identifier)
+
+        fields: List = []
+        for field_id in table_refs.references_by_field:
+            fields.append(self._field_by_id(upstream_table, field_id))
+
+        read_fields = SpecialFields.row_type_with_row_id(fields)
+        return TableReadPlan(
+            table_refs.identifier, upstream_table, read_fields,
+            Range.to_ranges(table_refs.row_ids))
+
+    def _load_descriptor_chunk(
+        self, plan: TableReadPlan, row_ranges: List[Range]
+    ) -> Tuple[Dict[BlobViewStruct, BlobDescriptor], set]:
+        identifier: Identifier = plan.identifier
+        upstream_table = plan.upstream_table
+        read_fields = plan.read_fields
+
+        projection_field_names: List[str] = [f.name for f in read_fields]
+
+        descriptor_table = 
upstream_table.copy({CoreOptions.BLOB_AS_DESCRIPTOR.key(): "true"})
+        read_builder = 
descriptor_table.new_read_builder().with_projection(projection_field_names)
+
+        if SpecialFields.ROW_ID.name not in [
+            data_field.name for data_field in read_builder.read_type()
+        ]:
+            raise ValueError(
+                "Cannot resolve blob view for table {} because row tracking is 
not readable."
+                .format(identifier.get_full_name())
+            )
+
+        predicate_builder = read_builder.new_predicate_builder()
+        range_predicates: List = []
+        for r in row_ranges:
+            if r.from_ == r.to:
+                range_predicates.append(
+                    predicate_builder.equal(SpecialFields.ROW_ID.name, 
r.from_))
+            else:
+                range_predicates.append(
+                    predicate_builder.between(SpecialFields.ROW_ID.name, 
r.from_, r.to))
+        if len(range_predicates) == 1:
+            predicate = range_predicates[0]
+        else:
+            predicate = predicate_builder.or_predicates(range_predicates)
+        read_builder.with_filter(predicate)
+        result = 
read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
+
+        if SpecialFields.ROW_ID.name not in result.schema.names:
+            raise ValueError(
+                "Cannot resolve blob view for table {} because row tracking is 
not readable."
+                .format(identifier.get_full_name())
+            )
+
+        row_id_values: List = 
result.column(SpecialFields.ROW_ID.name).to_pylist()
+        resolved: Dict[BlobViewStruct, BlobDescriptor] = {}
+        null_values: set = set()
+        for field in read_fields:
+            if field.name == SpecialFields.ROW_ID.name:
+                continue
+            if field.name not in result.schema.names:
+                continue
+            values = result.column(field.name).to_pylist()
+            for row_id, value in zip(row_id_values, values):
+                view_struct = BlobViewStruct(
+                    identifier.get_full_name(), field.id, int(row_id))
+                if value is None:
+                    null_values.add(view_struct)
+                    continue
+                descriptor = BlobDescriptor.deserialize(value)
+                resolved[view_struct] = descriptor
+        return resolved, null_values
+
+    @staticmethod
+    def _split_row_ranges(
+        row_ranges: List[Range], target_rows_per_task: int
+    ) -> List[List[Range]]:
+        """
+        Split row ranges into multiple chunks for parallel task processing.
+        """
+        if not row_ranges:
+            return []
+
+        chunks: List[List[Range]] = []
+        current_chunk: List[Range] = []
+        current_chunk_rows: int = 0
+
+        for r in row_ranges:
+            next_from = r.from_
+            # Process current range until all rows are allocated
+            while next_from <= r.to:
+                # If current chunk is full, save it and start a new one
+                if current_chunk_rows == target_rows_per_task:
+                    chunks.append(current_chunk)
+                    current_chunk = []
+                    current_chunk_rows = 0
+
+                # Calculate remaining capacity in current chunk
+                remaining = target_rows_per_task - current_chunk_rows
+                # Determine the end position for this allocation (don't exceed 
range boundary)
+                next_to = min(r.to, next_from + remaining - 1)
+
+                # Add the allocated range to current chunk
+                current_chunk.append(Range(next_from, next_to))
+                current_chunk_rows += next_to - next_from + 1
+
+                # Move to next unallocated position
+                next_from = next_to + 1
+
+        # Don't forget the last chunk if it has any ranges
+        if current_chunk:
+            chunks.append(current_chunk)
+
+        return chunks
+
+    @staticmethod
+    def _target_rows_per_task(plans: List[TableReadPlan]) -> int:
+        total_rows: int = 0
+        for plan in plans:
+            for r in plan.row_ranges:
+                total_rows += r.count()
+        if total_rows <= 0:
+            return _MIN_ROWS_PER_TASK
+
+        return max(_MIN_ROWS_PER_TASK, (total_rows + _PRELOAD_THREAD_NUM - 1) 
// _PRELOAD_THREAD_NUM)
+
+    def _load_table(self, identifier: Identifier):
+        catalog = self._table.catalog_environment.catalog_loader.load()
+        return catalog.get_table(identifier)
+
+    @staticmethod
+    def _field_by_id(table, field_id: int) -> 'DataField':
+        for field in table.table_schema.fields:
+            if field.id == field_id:
+                return field
+        raise ValueError(
+            "Cannot find blob fieldId {} in upstream table {}."
+            .format(field_id, table.identifier.get_full_name())
+        )
diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py 
b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
index 2fd0ec878e..01216b36cd 100644
--- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
+++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
@@ -57,6 +57,8 @@ class DedicatedFormatWriter(DataWriter):
         # Determine blob columns from table schema
         self.blob_column_names = self._get_blob_columns_from_schema()
         self.blob_descriptor_fields = 
CoreOptions.blob_descriptor_fields(self.options)
+        self.blob_view_fields = CoreOptions.blob_view_fields(self.options)
+        self.blob_inline_fields = 
self.blob_descriptor_fields.union(self.blob_view_fields)
 
         unknown_descriptor_fields = self.blob_descriptor_fields.difference(
             set(self.blob_column_names)
@@ -68,10 +70,10 @@ class DedicatedFormatWriter(DataWriter):
             )
 
         # Blob fields that should still be written to `.blob` files.
-        full_blob_file_column_names = [
-            col for col in self.blob_column_names if col not in 
self.blob_descriptor_fields
+        self.blob_file_column_names = [
+            col for col in self.blob_column_names if col not in 
self.blob_inline_fields
         ]
-        full_blob_file_set = set(full_blob_file_column_names)
+        full_blob_file_set = set(self.blob_file_column_names)
         all_column_names = self.table.field_names
 
         # Detect vector columns that should be written to dedicated files.
@@ -87,7 +89,7 @@ class DedicatedFormatWriter(DataWriter):
         if write_cols is not None:
             write_col_set = set(write_cols)
             self.blob_file_column_names = [
-                col for col in full_blob_file_column_names if col in 
write_col_set
+                col for col in self.blob_file_column_names if col in 
write_col_set
             ]
             self.vector_write_columns = [
                 col for col in full_vector_column_names if col in write_col_set
@@ -96,7 +98,6 @@ class DedicatedFormatWriter(DataWriter):
                 col for col in write_cols if col not in dedicated_set
             ]
         else:
-            self.blob_file_column_names = list(full_blob_file_column_names)
             self.vector_write_columns = list(full_vector_column_names) if 
has_dedicated_vector else []
             self.normal_column_names = [
                 col for col in all_column_names if col not in dedicated_set
@@ -159,12 +160,13 @@ class DedicatedFormatWriter(DataWriter):
 
         logger.info(
             "Initialized DedicatedFormatWriter with blob columns: %s, blob 
file columns: %s, "
-            "vector columns: %s, descriptor stored columns: %s, external 
storage fields: %s",
+            "vector columns: %s, descriptor stored columns: %s, external 
storage fields: %s, view stored columns: %s",
             self.blob_column_names,
             self.blob_file_column_names,
             self.vector_write_columns,
             sorted(self.blob_descriptor_fields),
             sorted(external_storage_fields) if external_storage_fields else [],
+            sorted(self.blob_view_fields)
         )
 
     def _get_blob_columns_from_schema(self) -> List[str]:
@@ -200,7 +202,7 @@ class DedicatedFormatWriter(DataWriter):
 
             # Split data into normal, blob, and vector parts
             normal_data, blob_data_map, vector_data = self._split_data(data)
-            self._validate_descriptor_stored_fields_input(data)
+            self._validate_inline_stored_fields_input(data)
 
             # Process and accumulate normal data (may be None for partial 
writes)
             processed_normal = self._process_normal_data(normal_data)
@@ -278,11 +280,11 @@ class DedicatedFormatWriter(DataWriter):
         )
         return normal_data, blob_data_map, vector_data
 
-    def _validate_descriptor_stored_fields_input(self, data: pa.RecordBatch):
-        if not self.blob_descriptor_fields:
+    def _validate_inline_stored_fields_input(self, data: pa.RecordBatch):
+        if not self.blob_inline_fields:
             return
 
-        from pypaimon.table.row.blob import BlobDescriptor
+        from pypaimon.table.row.blob import BlobDescriptor, BlobViewStruct
 
         for field_name in self.blob_descriptor_fields:
             if field_name not in data.schema.names:
@@ -311,6 +313,33 @@ class DedicatedFormatWriter(DataWriter):
                         "BlobDescriptor."
                     ) from e
 
+        for field_name in self.blob_view_fields:
+            if field_name not in data.schema.names:
+                continue
+            values = 
data.column(data.schema.get_field_index(field_name)).to_pylist()
+            for value in values:
+                if value is None:
+                    continue
+                if hasattr(value, 'as_py'):
+                    value = value.as_py()
+                if isinstance(value, str):
+                    value = value.encode('utf-8')
+                if not isinstance(value, (bytes, bytearray)):
+                    raise ValueError(
+                        "blob-view-field requires blob field value to be a 
serialized "
+                        "BlobViewStruct."
+                    )
+                try:
+                    view_bytes = bytes(value)
+                    view_struct = BlobViewStruct.deserialize(view_bytes)
+                    if view_struct.serialize() != view_bytes:
+                        raise ValueError("BlobViewStruct payload contains 
trailing bytes.")
+                except Exception as e:
+                    raise ValueError(
+                        "blob-view-field requires blob field value to be a 
serialized "
+                        "BlobViewStruct."
+                    ) from e
+
     @staticmethod
     def _process_normal_data(data: pa.RecordBatch) -> Optional[pa.Table]:
         """Process normal data (similar to base DataWriter)."""

Reply via email to