wombatu-kun commented on code in PR #18403:
URL: https://github.com/apache/hudi/pull/18403#discussion_r3205832056
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala:
##########
@@ -137,69 +149,62 @@ class SparkLanceReaderBase(enableVectorizedReader:
Boolean) extends SparkColumna
// the option regardless.
val blobMode = resolveBlobReadMode(storageConf)
val readOpts = FileReadOptions.builder().blobReadMode(blobMode).build()
- val arrowReader = lanceReader.readAll(columnNames, null,
DEFAULT_BATCH_SIZE, readOpts)
+ arrowReader = lanceReader.readAll(columnNames, null,
DEFAULT_BATCH_SIZE, readOpts)
- // Compose the DESCRIPTOR-aware blob transform only when the user
opted into that mode
- // AND the request actually has BLOB columns (otherwise the rewrite
has nothing to do).
+ // BLOB columns in the request — needed both for the DESCRIPTOR-mode
row-path transform
+ // and for the path-selection gate below.
val blobFieldNames: java.util.Set[String] =
iteratorSchema.fields.collect { case f if isBlobField(f) => f.name
}.toSet.asJava
- val blobTransform = if (blobMode == BlobReadMode.DESCRIPTOR &&
!blobFieldNames.isEmpty) {
- new BlobDescriptorTransform(blobFieldNames, filePath)
+
+ // Decide between batch mode and row mode.
+ // Fall back to row mode if:
+ // - type casting is needed (batch-level type casting deferred to
follow-up), OR
+ // - the partition schema contains a type the batch-mode
partition-vector populator
+ // does not handle (Struct/Array/Map/Char/Varchar/interval, etc.).
The row path
+ // preserves these via JoinedRow, so falling back avoids silently
nulling them out, OR
+ // - DESCRIPTOR blob mode is requested with BLOB columns present.
The descriptor
+ // rewrite (data→null + synthesized/passthrough reference) lives
in BlobDescriptorTransform
+ // which only the row path applies; the batch path would surface
Lance's raw
+ // {position, size} struct in `data`, breaking the DESCRIPTOR-mode
contract.
+ val hasTypeChanges = !implicitTypeChangeInfo.isEmpty
+ val partitionTypesBatchSupported =
+ partitionSchema.forall(f =>
isPartitionTypeSupportedForBatch(f.dataType))
+ val descriptorBlobReadNeedsRowPath =
+ blobMode == BlobReadMode.DESCRIPTOR && !blobFieldNames.isEmpty
+ if (enableVectorizedReader && !hasTypeChanges &&
partitionTypesBatchSupported
Review Comment:
This is gated at plan time, not at runtime.
`HoodieFileGroupReaderBasedFileFormat.supportBatch` disables batch mode for
Lance whenever `internalSchemaOpt.isPresent`
(`HoodieFileGroupReaderBasedFileFormat.scala:188-189`), and
`implicitTypeChangeInfo` is itself derived from the same `internalSchemaOpt`.
So when type changes exist, the reader is invoked with
`enableVectorizedReader=false` and the planner already expects `InternalRow` —
the row branch here is taken unconditionally.
The `!hasTypeChanges` check at line 174 is a defense-in-depth guard, not a
reachable fallback under that contract. The block comment at
`HoodieFileGroupReaderBasedFileFormat.scala:179-187` documents the
plan-time/runtime split explicitly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]