JingsongLi commented on code in PR #8021:
URL: https://github.com/apache/paimon/pull/8021#discussion_r3346081957


##########
paimon-python/pypaimon/read/split_read.py:
##########
@@ -792,6 +785,20 @@ def _push_down_predicate(self) -> Optional[Predicate]:
         return None
 
     def create_reader(self) -> RecordReader:
+        reader = self._create_raw_reader()
+
+        if ((CoreOptions.blob_view_fields(self.table.options) and 
CoreOptions.blob_view_resolve_enabled(

Review Comment:
   [P2] Java only enters BlobView resolution in `DataEvolutionTableRead` when 
`catalogContext != null`, and it gives a clear error if `readFactory` is 
missing. This path wraps the reader based only on `blob-view-field`, so tables 
created with `CatalogEnvironment.empty()` such as `FileStoreTable.from_path()` 
still reach `BlobViewLookup._load_table()` and fail with an `AttributeError` on 
`catalog_loader.load()`. Please check `catalog_environment.catalog_loader` 
before enabling resolution: either skip resolving as Java does when no resolver 
context exists, or raise a clear error, and add coverage for the 
no-catalog-loader case.



##########
paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py:
##########
@@ -15,68 +15,185 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Optional
+from typing import Callable, Optional, Set
 
+import pyarrow
 from pyarrow import RecordBatch
 
 from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.table.row.blob import Blob, BlobViewStruct
 
 
-class BlobDescriptorConvertReader(RecordBatchReader):
-    def __init__(self, inner: RecordBatchReader, table):
+class BlobInlineConvertReader(RecordBatchReader):
+    """Resolves BlobView and BlobDescriptor fields in record batches.
+
+    Processing is split into two clear stages:
+      Stage 1 (BlobView resolution): If view fields exist, use a lightweight
+               prescan reader (only projecting view columns) to collect
+               BlobViewStructs, bulk-preload their descriptors, then read
+               full data from the main reader and replace view field values
+               with the corresponding BlobDescriptor serialized bytes.
+      Stage 2 (BlobData resolution): Controlled by blob-as-descriptor option.
+               If false, resolve all BlobDescriptor bytes (from both descriptor
+               fields and view fields) into real blob data bytes.
+               If true, return as-is.
+    """
+
+    def __init__(self, inner: RecordBatchReader, table,
+                 prescan_reader_factory: Optional[Callable[[Set[str]], 
RecordBatchReader]] = None):
+        """
+        Args:
+            inner: The main data reader (reads all columns).
+            table: The table instance.
+            prescan_reader_factory: Optional factory that creates a lightweight
+                reader projecting only the specified field names. Used for
+                prescan to collect BlobViewStructs without reading all columns.
+                Signature: (field_names: Set[str]) -> RecordBatchReader
+        """
         self._inner = inner
         self._table = table
-        self._descriptor_fields = 
CoreOptions.blob_descriptor_fields(table.options)
+        self._prescan_reader_factory = prescan_reader_factory
         self.file_io = inner.file_io
         self.blob_field_indices = inner.blob_field_indices
+        # Preserve original BlobViewStruct bytes when resolve disabled: skip 
both
+        # view resolution (Stage 1) and descriptor-to-data resolution (Stage 
2).
+        resolve_enabled = CoreOptions.blob_view_resolve_enabled(table.options)
+        self._view_fields = CoreOptions.blob_view_fields(table.options) if 
resolve_enabled else set()
+        self._descriptor_fields = 
CoreOptions.blob_descriptor_fields(table.options)
+        self._blob_as_descriptor = 
CoreOptions.blob_as_descriptor(table.options)
+        self._prescan_done = False
+        self._blob_view_lookup = None
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
-        import pyarrow
+        # Ensure prescan is done before reading (only needed for view fields)
+        if self._view_fields and not self._prescan_done:
+            self._prescan_view_structs()
+
         batch = self._inner.read_arrow_batch()
         if batch is None:
             return None
-        return self._convert_batch(batch, pyarrow)
+        # Resolve view fields using the preloaded lookup
+        if self._view_fields and self._blob_view_lookup is not None:
+            batch = self._resolve_view_fields(batch, self._blob_view_lookup)
+        # Resolve BlobDescriptor -> real bytes (if blob-as-descriptor=false)
+        return self._resolve_descriptor_fields(batch)
+
+    # ------------------------------------------------------------------
+    # Stage 1: BlobView prescan (lightweight, only reads view columns)
+    # ------------------------------------------------------------------
+
+    def _prescan_view_structs(self):
+        """Use a lightweight prescan reader (projecting only view columns) to
+        collect all BlobViewStructs and bulk-preload their descriptors."""
+        from pypaimon.table.row.blob import BlobViewStruct
+        from pypaimon.utils.blob_view_lookup import BlobViewLookup
 
-    def _convert_batch(self, batch, pyarrow):
-        from pypaimon.table.row.blob import Blob, BlobDescriptor
+        all_view_structs = []
 
-        result = batch
-        for field_name in self._descriptor_fields:
-            if field_name not in result.schema.names:
+        prescan_reader = self._prescan_reader_factory(self._view_fields)
+        try:
+            while True:
+                batch = prescan_reader.read_arrow_batch()

Review Comment:
   [P2] This prescan exhausts the prescan reader before the first main batch is 
returned. In Python, `limit` is enforced by outer `TableRead` after a batch is 
returned or sliced, and `DataEvolutionSplitRead` itself never receives that 
limit. Java wires `topN/limit` into the prescan reader in 
`DataEvolutionTableRead.configureBlobViewPrescanRead`. As a result, Python 
`with_limit(1)` can still prescan and validate every later `BlobViewStruct` in 
the same split, and can even fail on a bad reference that the user-visible 
result would never return. Please pass limit/topN into the prescan scope, or at 
least add a blob-view + limit test to lock down the intended behavior.



##########
paimon-python/pypaimon/schema/schema.py:
##########
@@ -133,3 +102,61 @@ def from_pyarrow_schema(pa_schema: pa.Schema, 
partition_keys: Optional[List[str]
                 )
 
         return Schema(fields, partition_keys, primary_keys, options, comment)
+
+    @staticmethod
+    def _validate_blob_fields(fields, options, primary_keys):
+        """Validate blob field configurations in the schema."""
+        blob_names = [
+            field.name for field in fields
+            if 'blob' in str(field.type).lower()
+        ]
+
+        if not blob_names:

Review Comment:
   [P2] This early return skips validation for explicitly configured 
`blob-field` / `blob-descriptor-field` / `blob-view-field` when the schema 
currently has no BLOB fields. Java `SchemaValidation` validates these options 
unconditionally against BLOB fields. Today a table can be created with, for 
example, `blob-view-field=picture` while `picture` remains BYTES/STRING, and 
the later write/read paths either ignore or misuse that configuration. Please 
validate the configured fields first, then apply the BLOB-specific table 
constraints only when BLOB fields exist; `blob-field` should be included in 
that validation as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to