discivigour commented on code in PR #8021:
URL: https://github.com/apache/paimon/pull/8021#discussion_r3346426938
##########
paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py:
##########
@@ -15,68 +15,185 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Optional
+from typing import Callable, Optional, Set
+import pyarrow
from pyarrow import RecordBatch
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.table.row.blob import Blob, BlobViewStruct
-class BlobDescriptorConvertReader(RecordBatchReader):
- def __init__(self, inner: RecordBatchReader, table):
+class BlobInlineConvertReader(RecordBatchReader):
+ """Resolves BlobView and BlobDescriptor fields in record batches.
+
+ Processing is split into two clear stages:
+ Stage 1 (BlobView resolution): If view fields exist, use a lightweight
+ prescan reader (only projecting view columns) to collect
+ BlobViewStructs, bulk-preload their descriptors, then read
+ full data from the main reader and replace view field values
+ with the corresponding BlobDescriptor serialized bytes.
+ Stage 2 (BlobData resolution): Controlled by blob-as-descriptor option.
+ If false, resolve all BlobDescriptor bytes (from both descriptor
+ fields and view fields) into real blob data bytes.
+ If true, return as-is.
+ """
+
+ def __init__(self, inner: RecordBatchReader, table,
+ prescan_reader_factory: Optional[Callable[[Set[str]],
RecordBatchReader]] = None):
+ """
+ Args:
+ inner: The main data reader (reads all columns).
+ table: The table instance.
+ prescan_reader_factory: Optional factory that creates a lightweight
+ reader projecting only the specified field names. Used for
+ prescan to collect BlobViewStructs without reading all columns.
+ Signature: (field_names: Set[str]) -> RecordBatchReader
+ """
self._inner = inner
self._table = table
- self._descriptor_fields =
CoreOptions.blob_descriptor_fields(table.options)
+ self._prescan_reader_factory = prescan_reader_factory
self.file_io = inner.file_io
self.blob_field_indices = inner.blob_field_indices
+ # Preserve original BlobViewStruct bytes when resolve disabled: skip
both
+ # view resolution (Stage 1) and descriptor-to-data resolution (Stage
2).
+ resolve_enabled = CoreOptions.blob_view_resolve_enabled(table.options)
+ self._view_fields = CoreOptions.blob_view_fields(table.options) if
resolve_enabled else set()
+ self._descriptor_fields =
CoreOptions.blob_descriptor_fields(table.options)
+ self._blob_as_descriptor =
CoreOptions.blob_as_descriptor(table.options)
+ self._prescan_done = False
+ self._blob_view_lookup = None
def read_arrow_batch(self) -> Optional[RecordBatch]:
- import pyarrow
+ # Ensure prescan is done before reading (only needed for view fields)
+ if self._view_fields and not self._prescan_done:
+ self._prescan_view_structs()
+
batch = self._inner.read_arrow_batch()
if batch is None:
return None
- return self._convert_batch(batch, pyarrow)
+ # Resolve view fields using the preloaded lookup
+ if self._view_fields and self._blob_view_lookup is not None:
+ batch = self._resolve_view_fields(batch, self._blob_view_lookup)
+ # Resolve BlobDescriptor -> real bytes (if blob-as-descriptor=false)
+ return self._resolve_descriptor_fields(batch)
+
+ # ------------------------------------------------------------------
+ # Stage 1: BlobView prescan (lightweight, only reads view columns)
+ # ------------------------------------------------------------------
+
+ def _prescan_view_structs(self):
+ """Use a lightweight prescan reader (projecting only view columns) to
+ collect all BlobViewStructs and bulk-preload their descriptors."""
+ from pypaimon.table.row.blob import BlobViewStruct
+ from pypaimon.utils.blob_view_lookup import BlobViewLookup
- def _convert_batch(self, batch, pyarrow):
- from pypaimon.table.row.blob import Blob, BlobDescriptor
+ all_view_structs = []
- result = batch
- for field_name in self._descriptor_fields:
- if field_name not in result.schema.names:
+ prescan_reader = self._prescan_reader_factory(self._view_fields)
+ try:
+ while True:
+ batch = prescan_reader.read_arrow_batch()
Review Comment:
In pypaimon, currently, the limit pushdown in the Read stage of the Append
table is not supported. I will support this function in the next pr
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]