wombatu-kun commented on code in PR #18403:
URL: https://github.com/apache/hudi/pull/18403#discussion_r3205832056


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala:
##########
@@ -137,69 +149,62 @@ class SparkLanceReaderBase(enableVectorizedReader: 
Boolean) extends SparkColumna
         // the option regardless.
         val blobMode = resolveBlobReadMode(storageConf)
         val readOpts = FileReadOptions.builder().blobReadMode(blobMode).build()
-        val arrowReader = lanceReader.readAll(columnNames, null, 
DEFAULT_BATCH_SIZE, readOpts)
+        arrowReader = lanceReader.readAll(columnNames, null, 
DEFAULT_BATCH_SIZE, readOpts)
 
-        // Compose the DESCRIPTOR-aware blob transform only when the user 
opted into that mode
-        // AND the request actually has BLOB columns (otherwise the rewrite 
has nothing to do).
+        // BLOB columns in the request — needed both for the DESCRIPTOR-mode 
row-path transform
+        // and for the path-selection gate below.
         val blobFieldNames: java.util.Set[String] =
           iteratorSchema.fields.collect { case f if isBlobField(f) => f.name 
}.toSet.asJava
-        val blobTransform = if (blobMode == BlobReadMode.DESCRIPTOR && 
!blobFieldNames.isEmpty) {
-          new BlobDescriptorTransform(blobFieldNames, filePath)
+
+        // Decide between batch mode and row mode.
+        // Fall back to row mode if:
+        //   - type casting is needed (batch-level type casting deferred to 
follow-up), OR
+        //   - the partition schema contains a type the batch-mode 
partition-vector populator
+        //     does not handle (Struct/Array/Map/Char/Varchar/interval, etc.). 
The row path
+        //     preserves these via JoinedRow, so falling back avoids silently 
nulling them out, OR
+        //   - DESCRIPTOR blob mode is requested with BLOB columns present. 
The descriptor
+        //     rewrite (data→null + synthesized/passthrough reference) lives 
in BlobDescriptorTransform
+        //     which only the row path applies; the batch path would surface 
Lance's raw
+        //     {position, size} struct in `data`, breaking the DESCRIPTOR-mode 
contract.
+        val hasTypeChanges = !implicitTypeChangeInfo.isEmpty
+        val partitionTypesBatchSupported =
+          partitionSchema.forall(f => 
isPartitionTypeSupportedForBatch(f.dataType))
+        val descriptorBlobReadNeedsRowPath =
+          blobMode == BlobReadMode.DESCRIPTOR && !blobFieldNames.isEmpty
+        if (enableVectorizedReader && !hasTypeChanges && 
partitionTypesBatchSupported

Review Comment:
   This is gated at plan time, not at runtime. 
`HoodieFileGroupReaderBasedFileFormat.supportBatch` disables batch mode for 
Lance whenever `internalSchemaOpt.isPresent` 
(`HoodieFileGroupReaderBasedFileFormat.scala:188-189`), and 
`implicitTypeChangeInfo` is itself derived from the same `internalSchemaOpt`. 
So when type changes exist, the reader is invoked with 
`enableVectorizedReader=false` and the planner already expects `InternalRow` — 
the row branch here is taken unconditionally.
   
   The `!hasTypeChanges` check at line 174 is a defense-in-depth guard, not a 
reachable fallback under that contract. The block comment at 
`HoodieFileGroupReaderBasedFileFormat.scala:179-187` documents the 
plan-time/runtime split explicitly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to