JingsongLi commented on code in PR #8021:
URL: https://github.com/apache/paimon/pull/8021#discussion_r3346081957
##########
paimon-python/pypaimon/read/split_read.py:
##########
@@ -792,6 +785,20 @@ def _push_down_predicate(self) -> Optional[Predicate]:
return None
def create_reader(self) -> RecordReader:
+ reader = self._create_raw_reader()
+
+ if ((CoreOptions.blob_view_fields(self.table.options) and
CoreOptions.blob_view_resolve_enabled(
Review Comment:
[P2] Java 的 `DataEvolutionTableRead` 只有在 `catalogContext != null` 时才进入
BlobView resolve,并且 `readFactory` 为空会给出明确错误;这里仅根据 `blob-view-field` 包 reader。像
`FileStoreTable.from_path()` 这种 `CatalogEnvironment.empty()` 的表也会走进
`BlobViewLookup._load_table()`,然后在 `catalog_loader.load()` 上变成
`AttributeError`。建议这里先检查 `catalog_environment.catalog_loader`:没有 resolver 时按
Java 一样不 resolve,或者抛一个清晰异常,并补一个无 catalog loader 的测试。
##########
paimon-python/pypaimon/schema/schema.py:
##########
@@ -133,3 +102,61 @@ def from_pyarrow_schema(pa_schema: pa.Schema,
partition_keys: Optional[List[str]
)
return Schema(fields, partition_keys, primary_keys, options, comment)
+
+ @staticmethod
+ def _validate_blob_fields(fields, options, primary_keys):
+ """Validate blob field configurations in the schema."""
+ blob_names = [
+ field.name for field in fields
+ if 'blob' in str(field.type).lower()
+ ]
+
+ if not blob_names:
Review Comment:
[P2] 这个 early return 会让显式配置的 `blob-field` / `blob-descriptor-field` /
`blob-view-field` 在 schema 里没有 BLOB 字段时完全不校验;Java `SchemaValidation` 是无条件校验这些
option 是否指向 BLOB 字段。现在 `blob-view-field=picture` 但 `picture` 仍是 BYTES/STRING
的表可以建出来,后续写读路径会忽略或错用该配置。建议先校验配置字段,再决定是否有 BLOB-specific constraints;同时把
`blob-field` 也纳入校验。
##########
paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py:
##########
@@ -15,68 +15,185 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Optional
+from typing import Callable, Optional, Set
+import pyarrow
from pyarrow import RecordBatch
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.table.row.blob import Blob, BlobViewStruct
-class BlobDescriptorConvertReader(RecordBatchReader):
- def __init__(self, inner: RecordBatchReader, table):
+class BlobInlineConvertReader(RecordBatchReader):
+ """Resolves BlobView and BlobDescriptor fields in record batches.
+
+ Processing is split into two clear stages:
+ Stage 1 (BlobView resolution): If view fields exist, use a lightweight
+ prescan reader (only projecting view columns) to collect
+ BlobViewStructs, bulk-preload their descriptors, then read
+ full data from the main reader and replace view field values
+ with the corresponding BlobDescriptor serialized bytes.
+ Stage 2 (BlobData resolution): Controlled by blob-as-descriptor option.
+ If false, resolve all BlobDescriptor bytes (from both descriptor
+ fields and view fields) into real blob data bytes.
+ If true, return as-is.
+ """
+
+ def __init__(self, inner: RecordBatchReader, table,
+ prescan_reader_factory: Optional[Callable[[Set[str]],
RecordBatchReader]] = None):
+ """
+ Args:
+ inner: The main data reader (reads all columns).
+ table: The table instance.
+ prescan_reader_factory: Optional factory that creates a lightweight
+ reader projecting only the specified field names. Used for
+ prescan to collect BlobViewStructs without reading all columns.
+ Signature: (field_names: Set[str]) -> RecordBatchReader
+ """
self._inner = inner
self._table = table
- self._descriptor_fields =
CoreOptions.blob_descriptor_fields(table.options)
+ self._prescan_reader_factory = prescan_reader_factory
self.file_io = inner.file_io
self.blob_field_indices = inner.blob_field_indices
+ # Preserve original BlobViewStruct bytes when resolve disabled: skip
both
+ # view resolution (Stage 1) and descriptor-to-data resolution (Stage
2).
+ resolve_enabled = CoreOptions.blob_view_resolve_enabled(table.options)
+ self._view_fields = CoreOptions.blob_view_fields(table.options) if
resolve_enabled else set()
+ self._descriptor_fields =
CoreOptions.blob_descriptor_fields(table.options)
+ self._blob_as_descriptor =
CoreOptions.blob_as_descriptor(table.options)
+ self._prescan_done = False
+ self._blob_view_lookup = None
def read_arrow_batch(self) -> Optional[RecordBatch]:
- import pyarrow
+ # Ensure prescan is done before reading (only needed for view fields)
+ if self._view_fields and not self._prescan_done:
+ self._prescan_view_structs()
+
batch = self._inner.read_arrow_batch()
if batch is None:
return None
- return self._convert_batch(batch, pyarrow)
+ # Resolve view fields using the preloaded lookup
+ if self._view_fields and self._blob_view_lookup is not None:
+ batch = self._resolve_view_fields(batch, self._blob_view_lookup)
+ # Resolve BlobDescriptor -> real bytes (if blob-as-descriptor=false)
+ return self._resolve_descriptor_fields(batch)
+
+ # ------------------------------------------------------------------
+ # Stage 1: BlobView prescan (lightweight, only reads view columns)
+ # ------------------------------------------------------------------
+
+ def _prescan_view_structs(self):
+ """Use a lightweight prescan reader (projecting only view columns) to
+ collect all BlobViewStructs and bulk-preload their descriptors."""
+ from pypaimon.table.row.blob import BlobViewStruct
+ from pypaimon.utils.blob_view_lookup import BlobViewLookup
- def _convert_batch(self, batch, pyarrow):
- from pypaimon.table.row.blob import Blob, BlobDescriptor
+ all_view_structs = []
- result = batch
- for field_name in self._descriptor_fields:
- if field_name not in result.schema.names:
+ prescan_reader = self._prescan_reader_factory(self._view_fields)
+ try:
+ while True:
+ batch = prescan_reader.read_arrow_batch()
Review Comment:
[P2] 这里首次 `read_arrow_batch()` 前会把 prescan reader 整个读完;Python 的 `limit` 是
`TableRead` 外层在 batch 返回后 slice/停止,`DataEvolutionSplitRead` 本身没有收到这个 limit。Java
在 `DataEvolutionTableRead.configureBlobViewPrescanRead` 中会把 `topN/limit` 传给
prescan reader。结果 Python `with_limit(1)` 仍可能 prescan/校验同一个 split 后续所有
`BlobViewStruct`,甚至因为最终不会返回的坏引用失败。建议把 limit/topN 透传进 prescan 范围,或者至少补一个
blob-view + limit 的测试来锁住行为。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]