This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 40eadc2f01 [python] Support BlobView feature (#8021)
40eadc2f01 is described below
commit 40eadc2f0133d2859f85cc6b1e0de35433234846
Author: umi <[email protected]>
AuthorDate: Thu Jun 4 13:52:19 2026 +0800
[python] Support BlobView feature (#8021)
- Add Python BlobViewStruct / BlobView wire-format support and the
blob-view-field option.
- Store descriptor/view BLOB fields inline, validate bad inline field
configuration and payloads, and avoid writing new .blob files for view
fields.
- Resolve blob-view fields during reads through catalog-aware lookup,
returning bytes by default or upstream BlobDescriptor bytes when
blob-as-descriptor=true.
---
.../pypaimon/catalog/filesystem_catalog.py | 6 +-
.../pypaimon/common/options/core_options.py | 26 ++
.../read/reader/blob_descriptor_convert_reader.py | 179 ++++++--
.../pypaimon/read/reader/data_file_batch_reader.py | 61 ---
paimon-python/pypaimon/read/split_read.py | 55 ++-
paimon-python/pypaimon/schema/schema.py | 34 --
paimon-python/pypaimon/schema/schema_manager.py | 73 ++-
paimon-python/pypaimon/table/file_store_table.py | 5 +-
paimon-python/pypaimon/table/row/blob.py | 155 +++++++
paimon-python/pypaimon/table/special_fields.py | 19 +
paimon-python/pypaimon/tests/blob_table_test.py | 499 ++++++++++++++++++++-
paimon-python/pypaimon/tests/blob_test.py | 21 +-
.../pypaimon/tests/external_storage_blob_test.py | 2 +-
paimon-python/pypaimon/utils/blob_view_lookup.py | 274 +++++++++++
.../write/writer/dedicated_format_writer.py | 49 +-
15 files changed, 1299 insertions(+), 159 deletions(-)
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 86e2f775e7..b7356b4595 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -142,11 +142,11 @@ class FileSystemCatalog(Catalog):
table_schema = self.get_table_schema(identifier)
# Create catalog environment for filesystem catalog
- # Filesystem catalog doesn't support version management by default
+ from pypaimon.catalog.filesystem_catalog_loader import
FileSystemCatalogLoader
catalog_environment = CatalogEnvironment(
identifier=identifier,
- uuid=None, # Filesystem catalog doesn't track table UUIDs
- catalog_loader=None, # No catalog loader for filesystem
+ uuid=None,
+ catalog_loader=FileSystemCatalogLoader(self.catalog_context),
supports_version_management=False
)
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index c46fb64be9..6013019aba 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -274,6 +274,17 @@ class CoreOptions:
.with_description("Comma-separated field names to treat as BLOB view
fields.")
)
+ BLOB_VIEW_RESOLVE_ENABLED: ConfigOption[bool] = (
+ ConfigOptions.key("blob-view.resolve.enabled")
+ .boolean_type()
+ .default_value(True)
+ .with_description(
+ "Whether to resolve blob-view-field values from upstream tables at
"
+ "read time. Set to false to preserve BlobViewStruct references
when "
+ "forwarding blob view values to another blob-view table."
+ )
+ )
+
VECTOR_FIELD: ConfigOption[str] = (
ConfigOptions.key("vector-field")
.string_type()
@@ -744,6 +755,21 @@ class CoreOptions:
def blob_descriptor_fields(self, default=None):
value = self.options.get(CoreOptions.BLOB_DESCRIPTOR_FIELD, default)
+ return CoreOptions._parse_field_set(value)
+
+ def blob_view_fields(self, default=None):
+ value = self.options.get(CoreOptions.BLOB_VIEW_FIELD, default)
+ return CoreOptions._parse_field_set(value)
+
+ def blob_field(self, default=None):
+ value = self.options.get(CoreOptions.BLOB_FIELD, default)
+ return CoreOptions._parse_field_set(value)
+
+ def blob_view_resolve_enabled(self, default=True):
+ return self.options.get(CoreOptions.BLOB_VIEW_RESOLVE_ENABLED, default)
+
+ @staticmethod
+ def _parse_field_set(value):
if value is None:
return set()
if isinstance(value, str):
diff --git
a/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py
b/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py
index 35fe046a03..12e975bae5 100644
--- a/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py
+++ b/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py
@@ -15,68 +15,189 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Optional
+from typing import Callable, Optional, Set
+import pyarrow
from pyarrow import RecordBatch
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.table.row.blob import Blob, BlobViewStruct
-class BlobDescriptorConvertReader(RecordBatchReader):
- def __init__(self, inner: RecordBatchReader, table):
+class BlobInlineConvertReader(RecordBatchReader):
+ """Resolves BlobView and BlobDescriptor fields in record batches.
+
+ Processing is split into two clear stages:
+ Stage 1 (BlobView resolution): If view fields exist, use a lightweight
+ prescan reader (only projecting view columns) to collect
+ BlobViewStructs, bulk-preload their descriptors, then read
+ full data from the main reader and replace view field values
+ with the corresponding BlobDescriptor serialized bytes.
+ Stage 2 (BlobData resolution): Controlled by blob-as-descriptor option.
+ If false, resolve all BlobDescriptor bytes (from both descriptor
+ fields and view fields) into real blob data bytes.
+ If true, return as-is.
+ """
+
+ def __init__(self, inner: RecordBatchReader, table,
+ prescan_reader_factory: Optional[Callable[[Set[str]],
RecordBatchReader]] = None):
+ """
+ Args:
+ inner: The main data reader (reads all columns).
+ table: The table instance.
+ prescan_reader_factory: Optional factory that creates a lightweight
+ reader projecting only the specified field names. Used for
+ prescan to collect BlobViewStructs without reading all columns.
+ Signature: (field_names: Set[str]) -> RecordBatchReader
+ """
self._inner = inner
self._table = table
- self._descriptor_fields =
CoreOptions.blob_descriptor_fields(table.options)
+ self._prescan_reader_factory = prescan_reader_factory
self.file_io = inner.file_io
self.blob_field_indices = inner.blob_field_indices
+ # Preserve original BlobViewStruct bytes when resolve disabled: skip
both
+ # view resolution (Stage 1) and descriptor-to-data resolution (Stage
2).
+ resolve_enabled = CoreOptions.blob_view_resolve_enabled(
+ table.options) and self._table.catalog_environment.catalog_loader
is not None
+ self._view_fields = CoreOptions.blob_view_fields(table.options) if
resolve_enabled else set()
+ self._descriptor_fields =
CoreOptions.blob_descriptor_fields(table.options)
+ self._blob_as_descriptor =
CoreOptions.blob_as_descriptor(table.options)
+ self._prescan_done = False
+ self._blob_view_lookup = None
def read_arrow_batch(self) -> Optional[RecordBatch]:
- import pyarrow
+ # Align with Java: only enter blob view resolution when catalog_loader
is available
+ # If catalog_loader is None, skip both Stage 1 (view resolution) and
Stage 2 (descriptor resolution)
+ # This matches Java's behavior in DataEvolutionTableRead.createReader
where blob view reader
+ # is only created when catalogContext != null
+ if self._view_fields and not self._prescan_done:
+ self._prescan_view_structs()
+
batch = self._inner.read_arrow_batch()
if batch is None:
return None
- return self._convert_batch(batch, pyarrow)
+ # Resolve view fields using the preloaded lookup
+ if self._view_fields and self._blob_view_lookup is not None:
+ batch = self._resolve_view_fields(batch, self._blob_view_lookup)
+ # Resolve BlobDescriptor -> real bytes (if blob-as-descriptor=false)
+ return self._resolve_descriptor_fields(batch)
+
+ # ------------------------------------------------------------------
+ # Stage 1: BlobView prescan (lightweight, only reads view columns)
+ # ------------------------------------------------------------------
+
+ def _prescan_view_structs(self):
+ """Use a lightweight prescan reader (projecting only view columns) to
+ collect all BlobViewStructs and bulk-preload their descriptors."""
+ from pypaimon.table.row.blob import BlobViewStruct
+ from pypaimon.utils.blob_view_lookup import BlobViewLookup
- def _convert_batch(self, batch, pyarrow):
- from pypaimon.table.row.blob import Blob, BlobDescriptor
+ all_view_structs = []
- result = batch
- for field_name in self._descriptor_fields:
- if field_name not in result.schema.names:
+ prescan_reader = self._prescan_reader_factory(self._view_fields)
+ try:
+ while True:
+ batch = prescan_reader.read_arrow_batch()
+ if batch is None:
+ break
+ for field_name in self._view_fields:
+ if field_name not in batch.schema.names:
+ continue
+ for value in batch.column(field_name).to_pylist():
+ value = self._normalize_blob_to_bytes(value)
+ if value is None:
+ continue
+ if isinstance(value, bytes) and
BlobViewStruct.is_blob_view_struct(value):
+
all_view_structs.append(BlobViewStruct.deserialize(value))
+ else:
+ raise ValueError(
+ f"Expected BlobViewStruct bytes in view field
'{field_name}', "
+ f"but got non-BlobViewStruct bytes."
+ )
+ finally:
+ prescan_reader.close()
+
+ # Bulk-preload BlobViewStruct -> BlobDescriptor mapping
+ if all_view_structs:
+ self._blob_view_lookup = BlobViewLookup(self._table)
+ self._blob_view_lookup.preload(all_view_structs)
+ self._prescan_done = True
+
+ def _resolve_view_fields(self, batch, blob_view_lookup):
+ """Replace BlobViewStruct bytes in view fields with the corresponding
+ BlobDescriptor serialized bytes."""
+ for field_name in self._view_fields:
+ if field_name not in batch.schema.names:
continue
- values = result.column(field_name).to_pylist()
+ values = [self._normalize_blob_to_bytes(v) for v in
batch.column(field_name).to_pylist()]
converted_values = []
for value in values:
if value is None:
converted_values.append(None)
continue
- if hasattr(value, 'as_py'):
- value = value.as_py()
- if isinstance(value, str):
- value = value.encode('utf-8')
- if isinstance(value, bytearray):
- value = bytes(value)
if not isinstance(value, bytes):
converted_values.append(value)
continue
- try:
- descriptor = BlobDescriptor.deserialize(value)
- if descriptor.serialize() != value:
- converted_values.append(value)
- continue
- uri_reader =
self._table.file_io.uri_reader_factory.create(descriptor.uri)
- converted_values.append(Blob.from_descriptor(uri_reader,
descriptor).to_data())
- except Exception:
+ if not BlobViewStruct.is_blob_view_struct(value):
converted_values.append(value)
+ continue
+ view_struct = BlobViewStruct.deserialize(value)
+ if blob_view_lookup.resolve_to_null(view_struct):
+ converted_values.append(None)
+ else:
+ descriptor =
blob_view_lookup.resolve_descriptor(view_struct)
+ converted_values.append(descriptor.serialize())
- column_idx = result.schema.names.index(field_name)
- result = result.set_column(
+ column_idx = batch.schema.names.index(field_name)
+ batch = batch.set_column(
column_idx,
pyarrow.field(field_name, pyarrow.large_binary(),
nullable=True),
pyarrow.array(converted_values, type=pyarrow.large_binary()),
)
- return result
+ return batch
+
+ # ------------------------------------------------------------------
+ # Stage 2: BlobData resolution (unified exit)
+ # ------------------------------------------------------------------
+
+ def _resolve_descriptor_fields(self, batch):
+ if self._blob_as_descriptor:
+ return batch
+
+ all_inline_blob_fields = self._descriptor_fields | self._view_fields
+ for field_name in all_inline_blob_fields:
+ if field_name not in batch.schema.names:
+ continue
+ values = [self._normalize_blob_to_bytes(v) for v in
batch.column(field_name).to_pylist()]
+ converted_values = []
+ for value in values:
+ blob = Blob.from_bytes(value, self._table.file_io)
+ converted_values.append(blob.to_data() if blob else None)
+
+ column_idx = batch.schema.names.index(field_name)
+ batch = batch.set_column(
+ column_idx,
+ pyarrow.field(field_name, pyarrow.large_binary(),
nullable=True),
+ pyarrow.array(converted_values, type=pyarrow.large_binary()),
+ )
+ return batch
+
+ # ------------------------------------------------------------------
+ # Utilities
+ # ------------------------------------------------------------------
+
+ @staticmethod
+ def _normalize_blob_to_bytes(value):
+ if value is None:
+ return None
+ if hasattr(value, 'as_py'):
+ value = value.as_py()
+ if isinstance(value, str):
+ value = value.encode('utf-8')
+ if isinstance(value, bytearray):
+ value = bytes(value)
+ return value
def close(self):
self._inner.close()
diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index 12e6990e13..21d1b2a911 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -25,7 +25,6 @@ from pypaimon.read.partition_info import PartitionInfo
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
-from pypaimon.table.row.blob import Blob
from pypaimon.table.special_fields import SpecialFields
@@ -40,8 +39,6 @@ class DataFileBatchReader(RecordBatchReader):
first_row_id: int,
row_tracking_enabled: bool,
system_fields: dict,
- blob_as_descriptor: bool = False,
- blob_descriptor_fields: Optional[set] = None,
file_io: Optional[FileIO] = None,
row_id_offsets: Optional[List[int]] = None):
self.format_reader = format_reader
@@ -55,19 +52,7 @@ class DataFileBatchReader(RecordBatchReader):
self._row_id_cursor = 0
self.max_sequence_number = max_sequence_number
self.system_fields = system_fields
- self.blob_as_descriptor = blob_as_descriptor
- self.blob_descriptor_fields = blob_descriptor_fields or set()
self.file_io = file_io
- self.blob_field_names = {
- field.name
- for field in fields
- if hasattr(field.type, 'type') and field.type.type == 'BLOB'
- }
- self.descriptor_blob_fields = {
- field_name
- for field_name in self.blob_descriptor_fields
- if field_name in self.blob_field_names
- }
def read_arrow_batch(self, start_idx=None, end_idx=None) ->
Optional[RecordBatch]:
if isinstance(self.format_reader, FormatBlobReader):
@@ -140,8 +125,6 @@ class DataFileBatchReader(RecordBatchReader):
if self.row_tracking_enabled and self.system_fields:
record_batch = self._assign_row_tracking(record_batch)
- record_batch =
self._convert_descriptor_stored_blob_columns(record_batch)
-
return record_batch
def _align_batch_to_read_schema(self, names: List[str], arrays: list) ->
RecordBatch:
@@ -170,50 +153,6 @@ class DataFileBatchReader(RecordBatchReader):
out_fields.append(target_field)
return pa.RecordBatch.from_arrays(out_arrays,
schema=pa.schema(out_fields))
- def _convert_descriptor_stored_blob_columns(self, record_batch:
RecordBatch) -> RecordBatch:
- if isinstance(self.format_reader, FormatBlobReader):
- return record_batch
- if not self.descriptor_blob_fields:
- return record_batch
-
- schema_names = set(record_batch.schema.names)
- target_fields = [f for f in self.descriptor_blob_fields if f in
schema_names]
- if not target_fields:
- return record_batch
-
- arrays = list(record_batch.columns)
- for field_name in target_fields:
- field_idx = record_batch.schema.get_field_index(field_name)
- values = record_batch.column(field_idx).to_pylist()
-
- if self.blob_as_descriptor:
- converted = [self._normalize_blob_cell(v) for v in values]
- else:
- converted = [self._blob_cell_to_data(v) for v in values]
- arrays[field_idx] = pa.array(converted, type=pa.large_binary())
-
- return pa.RecordBatch.from_arrays(arrays, schema=record_batch.schema)
-
- @staticmethod
- def _normalize_blob_cell(value):
- if value is None:
- return None
- if hasattr(value, 'as_py'):
- value = value.as_py()
- if isinstance(value, str):
- value = value.encode('utf-8')
- if isinstance(value, bytearray):
- value = bytes(value)
- return value
-
- def _blob_cell_to_data(self, value):
- value = self._normalize_blob_cell(value)
- if value is None:
- return None
- if not isinstance(value, bytes):
- return value
- return Blob.from_bytes(value, self.file_io).to_data()
-
def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch:
"""Assign row tracking meta fields (_ROW_ID and _SEQUENCE_NUMBER)."""
arrays = list(record_batch.columns)
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 5a78cb07be..fe76527431 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -41,7 +41,7 @@ from pypaimon.read.reader.empty_record_reader import
EmptyFileRecordReader
from pypaimon.read.reader.field_bunch import BlobBunch, DataBunch, FieldBunch,
VectorBunch
from pypaimon.read.reader.filter_record_reader import FilterRecordReader
from pypaimon.read.reader.format_avro_reader import FormatAvroReader
-from pypaimon.read.reader.blob_descriptor_convert_reader import
BlobDescriptorConvertReader
+from pypaimon.read.reader.blob_descriptor_convert_reader import
BlobInlineConvertReader
from pypaimon.read.reader.filter_record_batch_reader import
FilterRecordBatchReader
from pypaimon.read.reader.limited_record_reader import
LimitedRecordBatchReader, LimitedRecordReader
from pypaimon.read.reader.row_range_filter_record_reader import
RowIdFilterRecordBatchReader
@@ -314,9 +314,6 @@ class SplitRead(ABC):
else:
raise ValueError(f"Unexpected file format: {file_format}")
- blob_as_descriptor = CoreOptions.blob_as_descriptor(self.table.options)
- blob_descriptor_fields =
CoreOptions.blob_descriptor_fields(self.table.options)
-
index_mapping = self.create_index_mapping()
partition_info = self._create_partition_info()
system_fields = SpecialFields.find_system_fields(self.read_fields)
@@ -344,8 +341,6 @@ class SplitRead(ABC):
effective_first_row_id,
row_tracking_enabled,
system_fields,
- blob_as_descriptor=blob_as_descriptor,
- blob_descriptor_fields=blob_descriptor_fields,
file_io=self.table.file_io,
row_id_offsets=row_indices)
else:
@@ -359,8 +354,6 @@ class SplitRead(ABC):
effective_first_row_id,
row_tracking_enabled,
system_fields,
- blob_as_descriptor=blob_as_descriptor,
- blob_descriptor_fields=blob_descriptor_fields,
file_io=self.table.file_io,
row_id_offsets=row_indices)
@@ -799,6 +792,20 @@ class DataEvolutionSplitRead(SplitRead):
return None
def create_reader(self) -> RecordReader:
+ reader = self._create_raw_reader()
+
+ if ((CoreOptions.blob_view_fields(self.table.options) and
CoreOptions.blob_view_resolve_enabled(
+ self.table.options))
+ or (not CoreOptions.blob_as_descriptor(self.table.options)
+ and
CoreOptions.blob_descriptor_fields(self.table.options))):
+ reader = BlobInlineConvertReader(
+ reader, self.table,
+ prescan_reader_factory=lambda names:
self._create_prescan_reader(names))
+
+ return reader
+
+ def _create_raw_reader(self) -> RecordReader:
+ """Core read logic: split_by_row_id -> suppliers -> ConcatBatchReader
-> filter."""
files = self.split.files
suppliers = []
@@ -830,15 +837,39 @@ class DataEvolutionSplitRead(SplitRead):
else:
reader = merge_reader
- if (not CoreOptions.blob_as_descriptor(self.table.options)
- and CoreOptions.blob_descriptor_fields(self.table.options)):
- reader = BlobDescriptorConvertReader(reader, self.table)
-
if self.limit is not None:
reader = LimitedRecordBatchReader(reader, self.limit)
return reader
+ def _create_prescan_reader(self, field_names):
+ """Create a prescan reader by constructing a new DataEvolutionSplitRead
+ instance that only projects the specified field names.
+
+ Align with Java's configureBlobViewPrescanRead: pass limit to prescan
reader
+ to avoid scanning entire split when there's a LIMIT clause.
+ """
+ from pypaimon.read.reader.iface.record_batch_reader import
EmptyRecordBatchReader
+
+ prescan_fields = [f for f in self.read_fields if f.name in field_names]
+ if not prescan_fields:
+ return EmptyRecordBatchReader()
+
+ # When there's a normal field predicate, don't push down limit to
prescan reader
+ # because the outer reader will apply predicate+limit filtering,
+ # while prescan reader would only apply limit without normal field
predicate
+ # TODO support limit+predicate push down
+ prescan_read = DataEvolutionSplitRead(
+ table=self.table,
+ predicate=self.predicate,
+ read_type=prescan_fields,
+ split=self.split,
+ row_tracking_enabled=False,
+ limit=None if self.predicate else self.limit,
+ )
+ prescan_read.row_ranges = self.row_ranges
+ return prescan_read._create_raw_reader()
+
def _split_by_row_id(self, files: List[DataFileMeta]) ->
List[List[DataFileMeta]]:
"""Split files by firstRowId for data evolution."""
diff --git a/paimon-python/pypaimon/schema/schema.py
b/paimon-python/pypaimon/schema/schema.py
index 9129667326..f3a63c88e1 100644
--- a/paimon-python/pypaimon/schema/schema.py
+++ b/paimon-python/pypaimon/schema/schema.py
@@ -62,40 +62,6 @@ class Schema:
if field.name in pk_set:
field.type.nullable = False
- # Check if Blob type exists in the schema
- blob_names = [
- field.name for field in fields
- if 'blob' in str(field.type).lower()
- ]
-
- if blob_names:
- if options is None:
- options = {}
-
- if len(fields) <= len(blob_names):
- raise ValueError(
- "Table with BLOB type column must have other normal
columns."
- )
-
- required_options = {
- CoreOptions.ROW_TRACKING_ENABLED.key(): 'true',
- CoreOptions.DATA_EVOLUTION_ENABLED.key(): 'true'
- }
-
- missing_options = []
- for key, expected_value in required_options.items():
- if key not in options or options[key] != expected_value:
- missing_options.append(f"{key}='{expected_value}'")
-
- if missing_options:
- raise ValueError(
- f"Schema contains Blob type but is missing required
options: {', '.join(missing_options)}. "
- f"Please add these options to the schema."
- )
-
- if primary_keys is not None:
- raise ValueError("Blob type is not supported with primary
key.")
-
# Check if Vector type with dedicated file format
vector_names = [
field.name for field in fields
diff --git a/paimon-python/pypaimon/schema/schema_manager.py
b/paimon-python/pypaimon/schema/schema_manager.py
index 645d2f4328..d01549c71b 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -53,7 +53,7 @@ def _get_rename_mappings(changes: List[SchemaChange]) -> dict:
def _handle_update_column_comment(
- change: UpdateColumnComment, new_fields: List[DataField]
+ change: UpdateColumnComment, new_fields: List[DataField]
):
field_name = change.field_names[-1]
field_index = _find_field_index(new_fields, field_name)
@@ -66,7 +66,7 @@ def _handle_update_column_comment(
def _handle_update_column_nullability(
- change: UpdateColumnNullability, new_fields: List[DataField]
+ change: UpdateColumnNullability, new_fields: List[DataField]
):
field_name = change.field_names[-1]
field_index = _find_field_index(new_fields, field_name)
@@ -83,7 +83,7 @@ def _handle_update_column_nullability(
def _handle_update_column_type(
- change: UpdateColumnType, new_fields: List[DataField]
+ change: UpdateColumnType, new_fields: List[DataField]
):
field_name = change.field_names[-1]
field_index = _find_field_index(new_fields, field_name)
@@ -166,6 +166,71 @@ def _assert_not_renaming_blob_column(
)
+def _validate_blob_fields(fields: List[DataField], options: dict,
primary_keys: List[str]):
+ """Validate blob field configurations in the schema."""
+ if options is None:
+ options = {}
+
+ blob_field_names = {
+ field.name for field in fields
+ if getattr(field.type, 'type', None) == 'BLOB'
+ }
+
+ if len(fields) <= len(blob_field_names):
+ raise ValueError(
+ "Table with BLOB type column must have other normal columns."
+ )
+
+ core_options = CoreOptions(Options(options))
+
+ configured_blob_fields = core_options.blob_field()
+ for field in configured_blob_fields:
+ if field not in blob_field_names:
+ raise ValueError(
+ "Field '{}' in '{}' must be a BLOB field in table
schema.".format(
+ field, CoreOptions.BLOB_FIELD.key()
+ )
+ )
+
+ descriptor_fields = core_options.blob_descriptor_fields()
+ view_fields = core_options.blob_view_fields()
+
+ all_inline_fields = descriptor_fields.union(view_fields)
+ non_blob_inline_fields = all_inline_fields.difference(blob_field_names)
+ if non_blob_inline_fields:
+ raise ValueError(
+ "Fields in 'blob-descriptor-field' or 'blob-view-field' must be
blob fields "
+ "in schema. Non-BLOB fields:
{}".format(sorted(non_blob_inline_fields))
+ )
+
+ overlapping_inline_fields = descriptor_fields.intersection(view_fields)
+ if overlapping_inline_fields:
+ raise ValueError(
+ "Fields in 'blob-descriptor-field' and 'blob-view-field' must not
overlap. "
+ "Overlapping fields: {}".format(sorted(overlapping_inline_fields))
+ )
+
+ if blob_field_names:
+ required_options = {
+ CoreOptions.ROW_TRACKING_ENABLED.key(): 'true',
+ CoreOptions.DATA_EVOLUTION_ENABLED.key(): 'true'
+ }
+
+ missing_options = []
+ for key, expected_value in required_options.items():
+ if key not in options or options[key] != expected_value:
+ missing_options.append(f"{key}='{expected_value}'")
+
+ if missing_options:
+ raise ValueError(
+ f"Schema contains Blob type but is missing required options:
{', '.join(missing_options)}. "
+ f"Please add these options to the schema."
+ )
+
+ if primary_keys:
+ raise ValueError("Blob type is not supported with primary key.")
+
+
def _validate_blob_external_storage_fields(fields: List[DataField], options:
dict):
"""Validate blob-external-storage-field configuration.
@@ -364,6 +429,7 @@ class SchemaManager:
comment=schema.comment,
)
+ _validate_blob_fields(schema.fields, schema.options,
schema.primary_keys)
_validate_blob_external_storage_fields(schema.fields,
schema.options)
table_schema = TableSchema.from_schema(schema_id=0, schema=schema)
success = self.commit(table_schema)
@@ -371,6 +437,7 @@ class SchemaManager:
return table_schema
def commit(self, new_schema: TableSchema) -> bool:
+ _validate_blob_fields(new_schema.fields, new_schema.options,
new_schema.primary_keys)
schema_path = self._to_schema_path(new_schema.id)
try:
result = self.file_io.try_to_write_atomic(schema_path,
JSON.to_json(new_schema, indent=2))
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 1fee68615a..9fd61abed4 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -117,9 +117,8 @@ class FileStoreTable(Table):
"""Get the branch manager for this table."""
# If catalog environment has a catalog loader, use CatalogBranchManager
catalog_loader = self.catalog_environment.catalog_loader
- if catalog_loader is not None:
- from pypaimon.branch.catalog_branch_manager import \
- CatalogBranchManager
+ if catalog_loader is not None and
self.catalog_environment.supports_version_management:
+ from pypaimon.branch.catalog_branch_manager import
CatalogBranchManager
return CatalogBranchManager(
catalog_loader,
self.identifier
diff --git a/paimon-python/pypaimon/table/row/blob.py
b/paimon-python/pypaimon/table/row/blob.py
index 056316d55f..eb2f00b764 100644
--- a/paimon-python/pypaimon/table/row/blob.py
+++ b/paimon-python/pypaimon/table/row/blob.py
@@ -21,6 +21,7 @@ from abc import ABC, abstractmethod
from typing import BinaryIO, Callable, Optional, Union
from urllib.parse import urlparse
+from pypaimon.common.identifier import Identifier
from pypaimon.common.uri_reader import UriReader, FileUriReader
@@ -162,6 +163,115 @@ class BlobDescriptor:
return self.__str__()
+class BlobViewStruct:
+ CURRENT_VERSION = 1
+ MAGIC = 0x424C4F4256494557 # "BLOBVIEW"
+
+ def __init__(self, identifier: Union[Identifier, str], field_id: int,
row_id: int):
+ if isinstance(identifier, str):
+ identifier = Identifier.from_string(identifier)
+ if not isinstance(identifier, Identifier):
+ raise TypeError("BlobViewStruct identifier must be Identifier or
str.")
+ self._identifier = identifier
+ self._field_id = field_id
+ self._row_id = row_id
+
+ @property
+ def identifier(self) -> Identifier:
+ return self._identifier
+
+ @property
+ def field_id(self) -> int:
+ return self._field_id
+
+ @property
+ def row_id(self) -> int:
+ return self._row_id
+
+ def serialize(self) -> bytes:
+ identifier_bytes = self._identifier.get_full_name().encode('utf-8')
+ data = struct.pack('<B', self.CURRENT_VERSION)
+ data += struct.pack('<Q', self.MAGIC)
+ data += struct.pack('<I', len(identifier_bytes))
+ data += identifier_bytes
+ data += struct.pack('<i', self._field_id)
+ data += struct.pack('<q', self._row_id)
+ return data
+
+ @classmethod
+ def deserialize(cls, data: bytes) -> 'BlobViewStruct':
+ if len(data) < 25:
+ raise ValueError("Invalid BlobViewStruct data: too short")
+
+ offset = 0
+ version = struct.unpack('<B', data[offset:offset + 1])[0]
+ offset += 1
+ if version != cls.CURRENT_VERSION:
+ raise ValueError(
+ f"Expecting BlobViewStruct version to be
{cls.CURRENT_VERSION}, "
+ f"but found {version}."
+ )
+
+ magic = struct.unpack('<Q', data[offset:offset + 8])[0]
+ offset += 8
+ if magic != cls.MAGIC:
+ raise ValueError(
+ f"Invalid BlobViewStruct: missing magic header. Expected
magic: "
+ f"{cls.MAGIC}, but found: {magic}"
+ )
+
+ identifier_length = struct.unpack('<I', data[offset:offset + 4])[0]
+ offset += 4
+ if offset + identifier_length + 12 > len(data):
+ raise ValueError("Invalid BlobViewStruct data: identifier length
exceeds data size")
+
+ identifier = data[offset:offset + identifier_length].decode('utf-8')
+ offset += identifier_length
+ field_id = struct.unpack('<i', data[offset:offset + 4])[0]
+ offset += 4
+ row_id = struct.unpack('<q', data[offset:offset + 8])[0]
+ offset += 8
+ if offset != len(data):
+ raise ValueError("Invalid BlobViewStruct data: trailing bytes")
+
+ return cls(Identifier.from_string(identifier), field_id, row_id)
+
+ @classmethod
+ def is_blob_view_struct(cls, data: bytes) -> bool:
+ if not isinstance(data, (bytes, bytearray)):
+ return False
+ raw = bytes(data)
+ if len(raw) < 9:
+ return False
+ version = raw[0]
+ if version != cls.CURRENT_VERSION:
+ return False
+ try:
+ magic = struct.unpack('<Q', raw[1:9])[0]
+ return magic == cls.MAGIC
+ except Exception:
+ return False
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, BlobViewStruct):
+ return False
+ return (self._identifier == other._identifier
+ and self._field_id == other._field_id
+ and self._row_id == other._row_id)
+
+ def __hash__(self) -> int:
+ return hash((self._identifier.get_full_name(), self._field_id,
self._row_id))
+
+ def __str__(self) -> str:
+ return (
+ f"BlobViewStruct(identifier={self._identifier.get_full_name()}, "
+ f"field_id={self._field_id}, row_id={self._row_id})"
+ )
+
+ def __repr__(self) -> str:
+ return self.__str__()
+
+
class OffsetInputStream(io.RawIOBase):
def __init__(self, wrapped, offset: int, length: int):
@@ -276,6 +386,10 @@ class Blob(ABC):
def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) ->
'Blob':
return BlobRef(uri_reader, descriptor)
+ @staticmethod
+ def from_view(view_struct: BlobViewStruct) -> 'BlobView':
+ return BlobView(view_struct)
+
@staticmethod
def from_bytes(data: Optional[bytes], file_io=None, allow_blob_data: bool
= True) -> Optional['Blob']:
if data is None:
@@ -283,6 +397,8 @@ class Blob(ABC):
if not isinstance(data, (bytes, bytearray)):
raise TypeError(f"Blob.from_bytes expects bytes, got {type(data)}")
data = bytes(data)
+ if BlobViewStruct.is_blob_view_struct(data):
+ return Blob.from_view(BlobViewStruct.deserialize(data))
is_descriptor = BlobDescriptor.is_blob_descriptor(data)
if not allow_blob_data and not is_descriptor:
raise ValueError(
@@ -385,3 +501,42 @@ class BlobRef(Blob):
BlobConsumer = Callable[[str, Optional[BlobDescriptor]], bool]
+
+
+class BlobView(Blob):
+
+ def __init__(self, view_struct: BlobViewStruct):
+ self._view_struct: BlobViewStruct = view_struct
+ self._resolved_blob: Optional[BlobRef] = None
+
+ @property
+ def view_struct(self) -> BlobViewStruct:
+ return self._view_struct
+
+ def is_resolved(self) -> bool:
+ return self._resolved_blob is not None
+
+ def resolve(self, uri_reader: UriReader, descriptor: BlobDescriptor):
+ self._resolved_blob = BlobRef(uri_reader, descriptor)
+
+ def to_data(self) -> bytes:
+ return self._resolved().to_data()
+
+ def to_descriptor(self) -> BlobDescriptor:
+ return self._resolved().to_descriptor()
+
+ def new_input_stream(self) -> BinaryIO:
+ return self._resolved().new_input_stream()
+
+ def _resolved(self) -> BlobRef:
+ if self._resolved_blob is None:
+ raise RuntimeError("BlobView is not resolved.")
+ return self._resolved_blob
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, BlobView):
+ return False
+ return self._view_struct == other._view_struct
+
+ def __hash__(self) -> int:
+ return hash(self._view_struct)
diff --git a/paimon-python/pypaimon/table/special_fields.py
b/paimon-python/pypaimon/table/special_fields.py
index 5c578ec85f..64d2429bef 100644
--- a/paimon-python/pypaimon/table/special_fields.py
+++ b/paimon-python/pypaimon/table/special_fields.py
@@ -81,3 +81,22 @@ class SpecialFields:
fields_with_row_tracking.append(SpecialFields.SEQUENCE_NUMBER)
return fields_with_row_tracking
+
+ @staticmethod
+ def row_type_with_row_id(table_fields: List[DataField]) -> List[DataField]:
+ """Add ROW_ID field to the given fields list.
+
+ Args:
+ table_fields: The original table fields
+ """
+ fields_with_row_id = list(table_fields)
+
+ for field in fields_with_row_id:
+ if SpecialFields.ROW_ID.name == field.name:
+ raise ValueError(
+ "Row tracking field name '{}' conflicts with existing
field names."
+ .format(field.name)
+ )
+
+ fields_with_row_id.append(SpecialFields.ROW_ID)
+ return fields_with_row_id
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index b6a12fe973..3d33594c4a 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1390,6 +1390,483 @@ class DedicatedFormatWriterTest(unittest.TestCase):
self.assertEqual(result.column('pic1').to_pylist()[0], pic1_data)
self.assertEqual(result.column('pic2').to_pylist()[0], pic2_data)
+ def test_blob_view_fields_resolve_upstream_blob(self):
+ from pypaimon import Schema
+ from pypaimon.common.options.core_options import CoreOptions
+ from pypaimon.table.row.blob import BlobViewStruct
+
+ source_schema = pa.schema([
+ ('id', pa.int32()),
+ ('picture', pa.large_binary()),
+ ])
+ source = Schema.from_pyarrow_schema(
+ source_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ }
+ )
+ self.catalog.create_table('test_db.blob_view_source', source, False)
+ source_table = self.catalog.get_table('test_db.blob_view_source')
+ payloads = [b'view-source-0', b'view-source-1']
+
+ write_builder = source_table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'picture': payloads,
+ }, schema=source_schema))
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ picture_field_id = next(
+ field.id for field in source_table.table_schema.fields if
field.name == 'picture'
+ )
+ view_values = [
+ BlobViewStruct('test_db.blob_view_source', picture_field_id,
0).serialize(),
+ BlobViewStruct('test_db.blob_view_source', picture_field_id,
1).serialize(),
+ ]
+
+ target_schema = pa.schema([
+ ('id', pa.int32()),
+ ('picture', pa.large_binary()),
+ ])
+ target = Schema.from_pyarrow_schema(
+ target_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-view-field': 'picture',
+ }
+ )
+ self.catalog.create_table('test_db.blob_view_target', target, False)
+ target_table = self.catalog.get_table('test_db.blob_view_target')
+
+ target_write_builder = target_table.new_batch_write_builder()
+ target_writer = target_write_builder.new_write()
+ target_writer.write_arrow(pa.Table.from_pydict({
+ 'id': [10, 11],
+ 'picture': view_values,
+ }, schema=target_schema))
+ target_commit_messages = target_writer.prepare_commit()
+ target_write_builder.new_commit().commit(target_commit_messages)
+ target_writer.close()
+
+ all_target_files = [f for msg in target_commit_messages for f in
msg.new_files]
+ self.assertFalse(
+ any(f.file_name.endswith('.blob') for f in all_target_files),
+ "Blob view fields should be stored inline without writing new blob
files",
+ )
+
+ result = target_table.new_read_builder().new_read().to_arrow(
+ target_table.new_read_builder().new_scan().plan().splits()
+ ).sort_by('id')
+ self.assertEqual(result.column('picture').to_pylist(), payloads)
+
+ descriptor_table =
target_table.copy({CoreOptions.BLOB_AS_DESCRIPTOR.key(): 'true'})
+ descriptor_result =
descriptor_table.new_read_builder().new_read().to_arrow(
+ descriptor_table.new_read_builder().new_scan().plan().splits()
+ ).sort_by('id')
+ # With blob-as-descriptor=true, view fields return BlobDescriptor bytes
+ from pypaimon.table.row.blob import BlobDescriptor
+ for value in descriptor_result.column('picture').to_pylist():
+ self.assertTrue(
+ BlobDescriptor.is_blob_descriptor(value),
+ "Expected BlobDescriptor bytes when blob-as-descriptor=true"
+ )
+
+ def test_blob_view_resolve_disabled_preserves_references(self):
+ from pypaimon import Schema
+ from pypaimon.common.options.core_options import CoreOptions
+ from pypaimon.table.row.blob import BlobViewStruct
+
+ source_schema = pa.schema([
+ ('id', pa.int32()),
+ ('picture', pa.large_binary()),
+ ])
+ source = Schema.from_pyarrow_schema(
+ source_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ }
+ )
+ self.catalog.create_table('test_db.blob_view_resolve_source', source,
False)
+ source_table =
self.catalog.get_table('test_db.blob_view_resolve_source')
+ payloads = [b'resolve-source-0', b'resolve-source-1']
+
+ write_builder = source_table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'picture': payloads,
+ }, schema=source_schema))
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ picture_field_id = next(
+ field.id for field in source_table.table_schema.fields if
field.name == 'picture'
+ )
+ view_values = [
+ BlobViewStruct('test_db.blob_view_resolve_source',
picture_field_id, 0).serialize(),
+ BlobViewStruct('test_db.blob_view_resolve_source',
picture_field_id, 1).serialize(),
+ ]
+
+ target_schema = pa.schema([
+ ('id', pa.int32()),
+ ('picture', pa.large_binary()),
+ ])
+ target = Schema.from_pyarrow_schema(
+ target_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-view-field': 'picture',
+ }
+ )
+ self.catalog.create_table('test_db.blob_view_resolve_target', target,
False)
+ target_table =
self.catalog.get_table('test_db.blob_view_resolve_target')
+
+ target_write_builder = target_table.new_batch_write_builder()
+ target_writer = target_write_builder.new_write()
+ target_writer.write_arrow(pa.Table.from_pydict({
+ 'id': [10, 11],
+ 'picture': view_values,
+ }, schema=target_schema))
+ target_commit_messages = target_writer.prepare_commit()
+ target_write_builder.new_commit().commit(target_commit_messages)
+ target_writer.close()
+
+ # Default (resolve enabled): view fields are resolved to real blob
data.
+ resolved_result = target_table.new_read_builder().new_read().to_arrow(
+ target_table.new_read_builder().new_scan().plan().splits()
+ ).sort_by('id')
+ self.assertEqual(resolved_result.column('picture').to_pylist(),
payloads)
+
+ # resolve disabled: view fields keep the original BlobViewStruct bytes.
+ preserve_table = target_table.copy(
+ {CoreOptions.BLOB_VIEW_RESOLVE_ENABLED.key(): 'false'}
+ )
+ preserve_result =
preserve_table.new_read_builder().new_read().to_arrow(
+ preserve_table.new_read_builder().new_scan().plan().splits()
+ ).sort_by('id')
+ preserved_values = preserve_result.column('picture').to_pylist()
+ self.assertEqual(preserved_values, view_values)
+ for value in preserved_values:
+ self.assertTrue(
+ BlobViewStruct.is_blob_view_struct(value),
+ "Expected original BlobViewStruct bytes when resolve disabled"
+ )
+
+ def test_blob_view_resolves_null_upstream_value(self):
+ from pypaimon import Schema
+ from pypaimon.table.row.blob import BlobViewStruct
+
+ source_schema = pa.schema([
+ ('id', pa.int32()),
+ ('picture', pa.large_binary()),
+ ])
+ source = Schema.from_pyarrow_schema(
+ source_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ }
+ )
+ self.catalog.create_table('test_db.blob_view_null_source', source,
False)
+ source_table = self.catalog.get_table('test_db.blob_view_null_source')
+ # Row 0 has a real blob value, row 1 has a null blob value.
+ payloads = [b'null-source-0', None]
+
+ write_builder = source_table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'picture': payloads,
+ }, schema=source_schema))
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ picture_field_id = next(
+ field.id for field in source_table.table_schema.fields if
field.name == 'picture'
+ )
+ view_values = [
+ BlobViewStruct('test_db.blob_view_null_source', picture_field_id,
0).serialize(),
+ BlobViewStruct('test_db.blob_view_null_source', picture_field_id,
1).serialize(),
+ ]
+
+ target_schema = pa.schema([
+ ('id', pa.int32()),
+ ('picture', pa.large_binary()),
+ ])
+ target = Schema.from_pyarrow_schema(
+ target_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-view-field': 'picture',
+ }
+ )
+ self.catalog.create_table('test_db.blob_view_null_target', target,
False)
+ target_table = self.catalog.get_table('test_db.blob_view_null_target')
+
+ target_write_builder = target_table.new_batch_write_builder()
+ target_writer = target_write_builder.new_write()
+ target_writer.write_arrow(pa.Table.from_pydict({
+ 'id': [10, 11],
+ 'picture': view_values,
+ }, schema=target_schema))
+ target_commit_messages = target_writer.prepare_commit()
+ target_write_builder.new_commit().commit(target_commit_messages)
+ target_writer.close()
+
+ # View referencing a real upstream value resolves to data; view
+ # referencing a null upstream value resolves to None (not an error).
+ result = target_table.new_read_builder().new_read().to_arrow(
+ target_table.new_read_builder().new_scan().plan().splits()
+ ).sort_by('id')
+ self.assertEqual(result.column('picture').to_pylist(),
[b'null-source-0', None])
+
+ def test_blob_view_fields_rejects_non_view_input(self):
+ from pypaimon import Schema
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('picture', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-view-field': 'picture',
+ }
+ )
+ self.catalog.create_table('test_db.blob_view_reject_test', schema,
False)
+ table = self.catalog.get_table('test_db.blob_view_reject_test')
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ bad_data = pa.Table.from_pydict({
+ 'id': [1],
+ 'picture': [b'not-a-view-struct'],
+ }, schema=pa_schema)
+
+ with self.assertRaises(ValueError) as context:
+ writer.write_arrow(bad_data)
+ self.assertIn("blob-view-field", str(context.exception))
+
+ def test_blob_inline_fields_reject_overlap_and_unknown_fields(self):
+ from pypaimon import Schema
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('picture', pa.large_binary()),
+ ])
+ base_options = {
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ }
+
+ overlap_options = dict(base_options)
+ overlap_options.update({
+ 'blob-descriptor-field': 'picture',
+ 'blob-view-field': 'picture',
+ })
+ overlap_schema = Schema.from_pyarrow_schema(pa_schema,
options=overlap_options)
+ with self.assertRaises(ValueError) as overlap_context:
+ self.catalog.create_table(
+ 'test_db.blob_overlap_reject', overlap_schema, False)
+ self.assertIn("must not overlap", str(overlap_context.exception))
+
+ unknown_options = dict(base_options)
+ unknown_options.update({'blob-view-field': 'missing_picture'})
+ unknown_schema = Schema.from_pyarrow_schema(pa_schema,
options=unknown_options)
+ with self.assertRaises(ValueError) as unknown_context:
+ self.catalog.create_table(
+ 'test_db.blob_unknown_reject', unknown_schema, False)
+ self.assertIn("must be blob fields", str(unknown_context.exception))
+
+ def test_blob_view_prescan_with_limit(self):
+ """Test that limit is correctly pushed down to prescan reader.
+
+ Regression test for: prescan should only scan up to limit rows,
+ not the entire split.
+ """
+ from pypaimon import Schema
+ from pypaimon.table.row.blob import BlobViewStruct
+
+ # Create source table with multiple rows
+ source_schema = pa.schema([
+ ('id', pa.int32()),
+ ('picture', pa.large_binary()),
+ ])
+ source = Schema.from_pyarrow_schema(
+ source_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ }
+ )
+ self.catalog.create_table('test_db.blob_view_limit_source', source,
False)
+ source_table = self.catalog.get_table('test_db.blob_view_limit_source')
+
+ # Write 10 rows
+ num_rows = 10
+ payloads = [f'payload-{i}'.encode() for i in range(num_rows)]
+ write_builder = source_table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(pa.Table.from_pydict({
+ 'id': list(range(num_rows)),
+ 'picture': payloads,
+ }, schema=source_schema))
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ picture_field_id = next(
+ field.id for field in source_table.table_schema.fields if
field.name == 'picture'
+ )
+ view_values = [
+ BlobViewStruct('test_db.blob_view_limit_source', picture_field_id,
i).serialize()
+ for i in range(num_rows)
+ ]
+
+ # Create target table with blob-view-field
+ target_schema = pa.schema([
+ ('id', pa.int32()),
+ ('picture', pa.large_binary()),
+ ])
+ target = Schema.from_pyarrow_schema(
+ target_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-view-field': 'picture',
+ }
+ )
+ self.catalog.create_table('test_db.blob_view_limit_target', target,
False)
+ target_table = self.catalog.get_table('test_db.blob_view_limit_target')
+
+ target_write_builder = target_table.new_batch_write_builder()
+ target_writer = target_write_builder.new_write()
+ target_writer.write_arrow(pa.Table.from_pydict({
+ 'id': list(range(num_rows)),
+ 'picture': view_values,
+ }, schema=target_schema))
+ target_commit_messages = target_writer.prepare_commit()
+ target_write_builder.new_commit().commit(target_commit_messages)
+ target_writer.close()
+
+ # Test with limit: should only return first 3 rows
+ read_builder = target_table.new_read_builder()
+ read_builder.with_limit(3)
+ result = read_builder.new_read().to_arrow(
+ read_builder.new_scan().plan().splits()
+ )
+ self.assertEqual(result.num_rows, 3, "LIMIT should be respected in
blob view prescan")
+ self.assertEqual(result.column('id').to_pylist(), [0, 1, 2])
+
+ def test_blob_view_prescan_only_collects_limited_view_structs(self):
+ """Verify that the prescan stage only collects as many BlobViewStructs
as
+ the limit allows, instead of scanning the entire split.
+
+ Unlike test_blob_view_prescan_with_limit (which only checks the final
+ output), this test patches BlobViewLookup.preload to capture the exact
+ list of view structs collected during prescan and asserts its length
+ equals the limit.
+ """
+ from unittest import mock
+
+ from pypaimon import Schema
+ from pypaimon.table.row.blob import BlobViewStruct
+ from pypaimon.utils.blob_view_lookup import BlobViewLookup
+
+ source_schema = pa.schema([
+ ('id', pa.int32()),
+ ('picture', pa.large_binary()),
+ ])
+ source = Schema.from_pyarrow_schema(
+ source_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ }
+ )
+ self.catalog.create_table('test_db.blob_view_prescan_count_source',
source, False)
+ source_table =
self.catalog.get_table('test_db.blob_view_prescan_count_source')
+
+ num_rows = 10
+ payloads = [f'payload-{i}'.encode() for i in range(num_rows)]
+ write_builder = source_table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(pa.Table.from_pydict({
+ 'id': list(range(num_rows)),
+ 'picture': payloads,
+ }, schema=source_schema))
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ picture_field_id = next(
+ field.id for field in source_table.table_schema.fields if
field.name == 'picture'
+ )
+ view_values = [
+ BlobViewStruct('test_db.blob_view_prescan_count_source',
picture_field_id, i).serialize()
+ for i in range(num_rows)
+ ]
+
+ target_schema = pa.schema([
+ ('id', pa.int32()),
+ ('picture', pa.large_binary()),
+ ])
+ target = Schema.from_pyarrow_schema(
+ target_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-view-field': 'picture',
+ }
+ )
+ self.catalog.create_table('test_db.blob_view_prescan_count_target',
target, False)
+ target_table =
self.catalog.get_table('test_db.blob_view_prescan_count_target')
+
+ target_write_builder = target_table.new_batch_write_builder()
+ target_writer = target_write_builder.new_write()
+ target_writer.write_arrow(pa.Table.from_pydict({
+ 'id': list(range(num_rows)),
+ 'picture': view_values,
+ }, schema=target_schema))
+ target_commit_messages = target_writer.prepare_commit()
+ target_write_builder.new_commit().commit(target_commit_messages)
+ target_writer.close()
+
+ captured_view_structs = []
+ original_preload = BlobViewLookup.preload
+
+ def capturing_preload(lookup_self, view_structs):
+ captured_view_structs.append(list(view_structs))
+ return original_preload(lookup_self, view_structs)
+
+ limit = 3
+ read_builder = target_table.new_read_builder()
+ read_builder.with_limit(limit)
+ with mock.patch.object(BlobViewLookup, 'preload', autospec=True,
+ side_effect=capturing_preload):
+ result = read_builder.new_read().to_arrow(
+ read_builder.new_scan().plan().splits()
+ )
+
+ self.assertEqual(result.num_rows, limit)
+ self.assertEqual(len(captured_view_structs), 1,
+ "preload should be invoked exactly once during
prescan")
+ self.assertEqual(
+ len(captured_view_structs[0]), limit,
+ "prescan should only collect as many view structs as the limit
allows")
+
def test_to_arrow_batch_reader(self):
import random
from pypaimon import Schema
@@ -3204,7 +3681,7 @@ class DedicatedFormatWriterTest(unittest.TestCase):
total_split_row_count = sum([s.row_count for s in splits])
self.assertEqual(total_split_row_count, num_rows * 2,
f"Total split row count should be {num_rows}, got
{total_split_row_count}")
-
+
total_merged_count = 0
for split in splits:
merged_count = split.merged_row_count()
@@ -3213,7 +3690,7 @@ class DedicatedFormatWriterTest(unittest.TestCase):
self.assertLessEqual(
merged_count, split.row_count,
f"merged_row_count ({merged_count}) should be <= row_count
({split.row_count})")
-
+
if total_merged_count > 0:
self.assertEqual(
total_merged_count, num_rows,
@@ -3312,6 +3789,24 @@ class DedicatedFormatWriterTest(unittest.TestCase):
)
self.assertIn('Cannot rename BLOB column', str(ctx.exception))
+ def test_nested_field_named_blob_not_treated_as_blob(self):
+ """Regression: a ROW field with a nested column whose name contains
+ 'blob' must NOT be treated as a top-level BLOB column. Previously
+ the substring match would falsely classify such fields, causing
+ create_table to require row-tracking and data-evolution options."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('payload', pa.struct([
+ ('blob_name', pa.string()),
+ ('value', pa.int64()),
+ ])),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.catalog.create_table(
+ 'test_db.nested_blob_name_no_error', schema, False)
+ table = self.catalog.get_table('test_db.nested_blob_name_no_error')
+ self.assertIsNotNone(table)
+
class GetBlobTest(unittest.TestCase):
diff --git a/paimon-python/pypaimon/tests/blob_test.py
b/paimon-python/pypaimon/tests/blob_test.py
index b91ffdaf43..37217f8b7c 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -31,7 +31,7 @@ from pypaimon.filesystem.local_file_io import LocalFileIO
from pypaimon.common.options import Options
from pypaimon.read.reader.format_blob_reader import BlobRecordIterator,
FormatBlobReader
from pypaimon.schema.data_types import AtomicType, DataField
-from pypaimon.table.row.blob import Blob, BlobData, BlobRef, BlobDescriptor
+from pypaimon.table.row.blob import Blob, BlobData, BlobRef, BlobDescriptor,
BlobViewStruct, BlobView
from pypaimon.table.row.generic_row import GenericRowDeserializer,
GenericRowSerializer, GenericRow
from pypaimon.table.row.row_kind import RowKind
@@ -166,6 +166,25 @@ class BlobTest(unittest.TestCase):
with self.assertRaises(TypeError):
Blob.from_bytes(12345)
+ def test_blob_view_struct_roundtrip(self):
+ """Test BlobViewStruct serialization compatibility."""
+ view_struct = BlobViewStruct("test_db.source_table", 7, 42)
+ serialized = view_struct.serialize()
+
+ self.assertTrue(BlobViewStruct.is_blob_view_struct(serialized))
+ self.assertFalse(BlobDescriptor.is_blob_descriptor(serialized))
+
+ restored = BlobViewStruct.deserialize(serialized)
+ self.assertEqual(restored, view_struct)
+ self.assertEqual(restored.identifier.get_full_name(),
"test_db.source_table")
+ self.assertEqual(restored.field_id, 7)
+ self.assertEqual(restored.row_id, 42)
+
+ blob = Blob.from_bytes(view_struct.serialize())
+ self.assertIsInstance(blob, BlobView)
+ self.assertFalse(blob.is_resolved())
+ self.assertEqual(blob.view_struct, view_struct)
+
def test_blob_data_interface_compliance(self):
"""Test that BlobData properly implements Blob interface."""
test_data = b"interface test data"
diff --git a/paimon-python/pypaimon/tests/external_storage_blob_test.py
b/paimon-python/pypaimon/tests/external_storage_blob_test.py
index 505e4407a9..e7ad8273d9 100644
--- a/paimon-python/pypaimon/tests/external_storage_blob_test.py
+++ b/paimon-python/pypaimon/tests/external_storage_blob_test.py
@@ -94,7 +94,7 @@ class ExternalStorageBlobValidationTest(unittest.TestCase):
})
with self.assertRaises(ValueError) as ctx:
self.catalog.create_table('test_db.not_blob_type_test', schema,
False)
- self.assertIn('must be a BLOB type field', str(ctx.exception))
+ self.assertIn('must be blob fields', str(ctx.exception))
def test_validation_blob_not_null_field_passes(self):
"""BLOB NOT NULL fields should pass validation (not be rejected by str
comparison)."""
diff --git a/paimon-python/pypaimon/utils/blob_view_lookup.py
b/paimon-python/pypaimon/utils/blob_view_lookup.py
new file mode 100644
index 0000000000..1acff6bcc8
--- /dev/null
+++ b/paimon-python/pypaimon/utils/blob_view_lookup.py
@@ -0,0 +1,274 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from typing import Dict, List, Tuple, Set
+
+from pypaimon.common.identifier import Identifier
+from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.table.row.blob import BlobDescriptor, BlobViewStruct
+from pypaimon.table.special_fields import SpecialFields
+from pypaimon.utils.range import Range
+
+_PRELOAD_THREAD_NUM = 100
+_MIN_ROWS_PER_TASK = 100
+
+
+class TableReferences:
+ """Groups BlobViewStruct references by upstream table."""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier: Identifier = identifier
+ self.references_by_field: Dict[int, List[BlobViewStruct]] = {}
+ self.row_ids: List[int] = []
+
+ def add(self, view_struct: BlobViewStruct) -> None:
+ self.references_by_field.setdefault(view_struct.field_id,
[]).append(view_struct)
+ self.row_ids.append(int(view_struct.row_id))
+
+
+class TableReadPlan:
+ """A plan for reading blob descriptors from one upstream table."""
+
+ def __init__(self, identifier: Identifier, upstream_table,
+ read_fields: List, row_ranges: List[Range]):
+ self.identifier: Identifier = identifier
+ self.upstream_table = upstream_table
+ self.read_fields: List = read_fields
+ self.row_ranges: List[Range] = row_ranges
+
+
+class BlobViewLookup:
+ """Resolve BlobViewStruct references by reading upstream blob
descriptors."""
+
+ def __init__(self, table):
+ self._table = table
+ self._descriptor_cache: Dict[BlobViewStruct, BlobDescriptor] = {}
+ self._null_value_cache: Set[BlobViewStruct] = set()
+
+ def preload(self, view_structs: List[BlobViewStruct]):
+ if not view_structs:
+ return
+
+ grouped: Dict[str, TableReferences] =
self._group_by_table(view_structs)
+ plans: List[TableReadPlan] = []
+ for table_refs in grouped.values():
+ plans.append(self._create_table_read_plan(table_refs))
+
+ target_rows: int = self._target_rows_per_task(plans)
+ tasks: List[Tuple[TableReadPlan, List[Range]]] = []
+ for plan in plans:
+ for range_chunk in self._split_row_ranges(plan.row_ranges,
target_rows):
+ tasks.append((plan, range_chunk))
+
+ if len(tasks) <= 1:
+ for plan, range_chunk in tasks:
+ descriptors, null_values = self._load_descriptor_chunk(plan,
range_chunk)
+ self._descriptor_cache.update(descriptors)
+ self._null_value_cache.update(null_values)
+ return
+
+ with ThreadPoolExecutor(max_workers=min(_PRELOAD_THREAD_NUM,
len(tasks))) as executor:
+ futures = {
+ executor.submit(self._load_descriptor_chunk, plan,
range_chunk): (plan, range_chunk)
+ for plan, range_chunk in tasks
+ }
+ for future in as_completed(futures):
+ try:
+ descriptors, null_values = future.result()
+ self._descriptor_cache.update(descriptors)
+ self._null_value_cache.update(null_values)
+ except Exception as exc:
+ # Cancel remaining futures that have not started yet so a
single
+ # failure can abort the rest of the preload work as early
as possible.
+ for pending_future in futures:
+ pending_future.cancel()
+ raise RuntimeError("Failed to preload blob descriptors.")
from exc
+
+ def resolve_descriptor(self, view_struct: BlobViewStruct) ->
BlobDescriptor:
+ descriptor: BlobDescriptor = self._descriptor_cache.get(view_struct)
+ if descriptor is None:
+ if view_struct in self._null_value_cache:
+ raise ValueError(
+ "BlobViewStruct {} resolves to a null blob
value.".format(view_struct)
+ )
+ raise ValueError(
+ "Cannot resolve BlobViewStruct {} because row id {} was not
found "
+ "in upstream table.".format(view_struct, view_struct.row_id)
+ )
+ return descriptor
+
+ def resolve_to_null(self, view_struct: BlobViewStruct) -> bool:
+ if view_struct in self._null_value_cache:
+ return True
+ if view_struct not in self._descriptor_cache:
+ raise ValueError(
+ "Cannot resolve BlobViewStruct {} because row id {} was not
found "
+ "in upstream table.".format(view_struct, view_struct.row_id)
+ )
+ return False
+
+ def _group_by_table(
+ self, view_structs: List[BlobViewStruct]
+ ) -> Dict[str, TableReferences]:
+ grouped: Dict[str, TableReferences] = {}
+ for view_struct in view_structs:
+ key = view_struct.identifier.get_full_name()
+ if key not in grouped:
+ grouped[key] = TableReferences(view_struct.identifier)
+ grouped[key].add(view_struct)
+ return grouped
+
+ def _create_table_read_plan(self, table_refs: TableReferences) ->
TableReadPlan:
+ upstream_table = self._load_table(table_refs.identifier)
+
+ fields: List = []
+ for field_id in table_refs.references_by_field:
+ fields.append(self._field_by_id(upstream_table, field_id))
+
+ read_fields = SpecialFields.row_type_with_row_id(fields)
+ return TableReadPlan(
+ table_refs.identifier, upstream_table, read_fields,
+ Range.to_ranges(table_refs.row_ids))
+
+ def _load_descriptor_chunk(
+ self, plan: TableReadPlan, row_ranges: List[Range]
+ ) -> Tuple[Dict[BlobViewStruct, BlobDescriptor], set]:
+ identifier: Identifier = plan.identifier
+ upstream_table = plan.upstream_table
+ read_fields = plan.read_fields
+
+ projection_field_names: List[str] = [f.name for f in read_fields]
+
+ descriptor_table =
upstream_table.copy({CoreOptions.BLOB_AS_DESCRIPTOR.key(): "true"})
+ read_builder =
descriptor_table.new_read_builder().with_projection(projection_field_names)
+
+ if SpecialFields.ROW_ID.name not in [
+ data_field.name for data_field in read_builder.read_type()
+ ]:
+ raise ValueError(
+ "Cannot resolve blob view for table {} because row tracking is
not readable."
+ .format(identifier.get_full_name())
+ )
+
+ predicate_builder = read_builder.new_predicate_builder()
+ range_predicates: List = []
+ for r in row_ranges:
+ if r.from_ == r.to:
+ range_predicates.append(
+ predicate_builder.equal(SpecialFields.ROW_ID.name,
r.from_))
+ else:
+ range_predicates.append(
+ predicate_builder.between(SpecialFields.ROW_ID.name,
r.from_, r.to))
+ if len(range_predicates) == 1:
+ predicate = range_predicates[0]
+ else:
+ predicate = predicate_builder.or_predicates(range_predicates)
+ read_builder.with_filter(predicate)
+ result =
read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
+
+ if SpecialFields.ROW_ID.name not in result.schema.names:
+ raise ValueError(
+ "Cannot resolve blob view for table {} because row tracking is
not readable."
+ .format(identifier.get_full_name())
+ )
+
+ row_id_values: List =
result.column(SpecialFields.ROW_ID.name).to_pylist()
+ resolved: Dict[BlobViewStruct, BlobDescriptor] = {}
+ null_values: set = set()
+ for field in read_fields:
+ if field.name == SpecialFields.ROW_ID.name:
+ continue
+ if field.name not in result.schema.names:
+ continue
+ values = result.column(field.name).to_pylist()
+ for row_id, value in zip(row_id_values, values):
+ view_struct = BlobViewStruct(
+ identifier.get_full_name(), field.id, int(row_id))
+ if value is None:
+ null_values.add(view_struct)
+ continue
+ descriptor = BlobDescriptor.deserialize(value)
+ resolved[view_struct] = descriptor
+ return resolved, null_values
+
+ @staticmethod
+ def _split_row_ranges(
+ row_ranges: List[Range], target_rows_per_task: int
+ ) -> List[List[Range]]:
+ """
+ Split row ranges into multiple chunks for parallel task processing.
+ """
+ if not row_ranges:
+ return []
+
+ chunks: List[List[Range]] = []
+ current_chunk: List[Range] = []
+ current_chunk_rows: int = 0
+
+ for r in row_ranges:
+ next_from = r.from_
+ # Process current range until all rows are allocated
+ while next_from <= r.to:
+ # If current chunk is full, save it and start a new one
+ if current_chunk_rows == target_rows_per_task:
+ chunks.append(current_chunk)
+ current_chunk = []
+ current_chunk_rows = 0
+
+ # Calculate remaining capacity in current chunk
+ remaining = target_rows_per_task - current_chunk_rows
+ # Determine the end position for this allocation (don't exceed
range boundary)
+ next_to = min(r.to, next_from + remaining - 1)
+
+ # Add the allocated range to current chunk
+ current_chunk.append(Range(next_from, next_to))
+ current_chunk_rows += next_to - next_from + 1
+
+ # Move to next unallocated position
+ next_from = next_to + 1
+
+ # Don't forget the last chunk if it has any ranges
+ if current_chunk:
+ chunks.append(current_chunk)
+
+ return chunks
+
+ @staticmethod
+ def _target_rows_per_task(plans: List[TableReadPlan]) -> int:
+ total_rows: int = 0
+ for plan in plans:
+ for r in plan.row_ranges:
+ total_rows += r.count()
+ if total_rows <= 0:
+ return _MIN_ROWS_PER_TASK
+
+ return max(_MIN_ROWS_PER_TASK, (total_rows + _PRELOAD_THREAD_NUM - 1)
// _PRELOAD_THREAD_NUM)
+
+ def _load_table(self, identifier: Identifier):
+ catalog = self._table.catalog_environment.catalog_loader.load()
+ return catalog.get_table(identifier)
+
+ @staticmethod
+ def _field_by_id(table, field_id: int) -> 'DataField':
+ for field in table.table_schema.fields:
+ if field.id == field_id:
+ return field
+ raise ValueError(
+ "Cannot find blob fieldId {} in upstream table {}."
+ .format(field_id, table.identifier.get_full_name())
+ )
diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
index 2fd0ec878e..01216b36cd 100644
--- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
+++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
@@ -57,6 +57,8 @@ class DedicatedFormatWriter(DataWriter):
# Determine blob columns from table schema
self.blob_column_names = self._get_blob_columns_from_schema()
self.blob_descriptor_fields =
CoreOptions.blob_descriptor_fields(self.options)
+ self.blob_view_fields = CoreOptions.blob_view_fields(self.options)
+ self.blob_inline_fields =
self.blob_descriptor_fields.union(self.blob_view_fields)
unknown_descriptor_fields = self.blob_descriptor_fields.difference(
set(self.blob_column_names)
@@ -68,10 +70,10 @@ class DedicatedFormatWriter(DataWriter):
)
# Blob fields that should still be written to `.blob` files.
- full_blob_file_column_names = [
- col for col in self.blob_column_names if col not in
self.blob_descriptor_fields
+ self.blob_file_column_names = [
+ col for col in self.blob_column_names if col not in
self.blob_inline_fields
]
- full_blob_file_set = set(full_blob_file_column_names)
+ full_blob_file_set = set(self.blob_file_column_names)
all_column_names = self.table.field_names
# Detect vector columns that should be written to dedicated files.
@@ -87,7 +89,7 @@ class DedicatedFormatWriter(DataWriter):
if write_cols is not None:
write_col_set = set(write_cols)
self.blob_file_column_names = [
- col for col in full_blob_file_column_names if col in
write_col_set
+ col for col in self.blob_file_column_names if col in
write_col_set
]
self.vector_write_columns = [
col for col in full_vector_column_names if col in write_col_set
@@ -96,7 +98,6 @@ class DedicatedFormatWriter(DataWriter):
col for col in write_cols if col not in dedicated_set
]
else:
- self.blob_file_column_names = list(full_blob_file_column_names)
self.vector_write_columns = list(full_vector_column_names) if
has_dedicated_vector else []
self.normal_column_names = [
col for col in all_column_names if col not in dedicated_set
@@ -159,12 +160,13 @@ class DedicatedFormatWriter(DataWriter):
logger.info(
"Initialized DedicatedFormatWriter with blob columns: %s, blob
file columns: %s, "
- "vector columns: %s, descriptor stored columns: %s, external
storage fields: %s",
+ "vector columns: %s, descriptor stored columns: %s, external
storage fields: %s, view stored columns: %s",
self.blob_column_names,
self.blob_file_column_names,
self.vector_write_columns,
sorted(self.blob_descriptor_fields),
sorted(external_storage_fields) if external_storage_fields else [],
+ sorted(self.blob_view_fields)
)
def _get_blob_columns_from_schema(self) -> List[str]:
@@ -200,7 +202,7 @@ class DedicatedFormatWriter(DataWriter):
# Split data into normal, blob, and vector parts
normal_data, blob_data_map, vector_data = self._split_data(data)
- self._validate_descriptor_stored_fields_input(data)
+ self._validate_inline_stored_fields_input(data)
# Process and accumulate normal data (may be None for partial
writes)
processed_normal = self._process_normal_data(normal_data)
@@ -278,11 +280,11 @@ class DedicatedFormatWriter(DataWriter):
)
return normal_data, blob_data_map, vector_data
- def _validate_descriptor_stored_fields_input(self, data: pa.RecordBatch):
- if not self.blob_descriptor_fields:
+ def _validate_inline_stored_fields_input(self, data: pa.RecordBatch):
+ if not self.blob_inline_fields:
return
- from pypaimon.table.row.blob import BlobDescriptor
+ from pypaimon.table.row.blob import BlobDescriptor, BlobViewStruct
for field_name in self.blob_descriptor_fields:
if field_name not in data.schema.names:
@@ -311,6 +313,33 @@ class DedicatedFormatWriter(DataWriter):
"BlobDescriptor."
) from e
+ for field_name in self.blob_view_fields:
+ if field_name not in data.schema.names:
+ continue
+ values =
data.column(data.schema.get_field_index(field_name)).to_pylist()
+ for value in values:
+ if value is None:
+ continue
+ if hasattr(value, 'as_py'):
+ value = value.as_py()
+ if isinstance(value, str):
+ value = value.encode('utf-8')
+ if not isinstance(value, (bytes, bytearray)):
+ raise ValueError(
+ "blob-view-field requires blob field value to be a
serialized "
+ "BlobViewStruct."
+ )
+ try:
+ view_bytes = bytes(value)
+ view_struct = BlobViewStruct.deserialize(view_bytes)
+ if view_struct.serialize() != view_bytes:
+ raise ValueError("BlobViewStruct payload contains
trailing bytes.")
+ except Exception as e:
+ raise ValueError(
+ "blob-view-field requires blob field value to be a
serialized "
+ "BlobViewStruct."
+ ) from e
+
@staticmethod
def _process_normal_data(data: pa.RecordBatch) -> Optional[pa.Table]:
"""Process normal data (similar to base DataWriter)."""