hudi-agent commented on code in PR #18403:
URL: https://github.com/apache/hudi/pull/18403#discussion_r3189906874
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala:
##########
@@ -137,69 +149,62 @@ class SparkLanceReaderBase(enableVectorizedReader:
Boolean) extends SparkColumna
// the option regardless.
val blobMode = resolveBlobReadMode(storageConf)
val readOpts = FileReadOptions.builder().blobReadMode(blobMode).build()
- val arrowReader = lanceReader.readAll(columnNames, null,
DEFAULT_BATCH_SIZE, readOpts)
+ arrowReader = lanceReader.readAll(columnNames, null,
DEFAULT_BATCH_SIZE, readOpts)
- // Compose the DESCRIPTOR-aware blob transform only when the user
opted into that mode
- // AND the request actually has BLOB columns (otherwise the rewrite
has nothing to do).
+ // BLOB columns in the request โ needed both for the DESCRIPTOR-mode
row-path transform
+ // and for the path-selection gate below.
val blobFieldNames: java.util.Set[String] =
iteratorSchema.fields.collect { case f if isBlobField(f) => f.name
}.toSet.asJava
- val blobTransform = if (blobMode == BlobReadMode.DESCRIPTOR &&
!blobFieldNames.isEmpty) {
- new BlobDescriptorTransform(blobFieldNames, filePath)
+
+ // Decide between batch mode and row mode.
+ // Fall back to row mode if:
+ // - type casting is needed (batch-level type casting deferred to
follow-up), OR
+ // - the partition schema contains a type the batch-mode
partition-vector populator
+ // does not handle (Struct/Array/Map/Char/Varchar/interval, etc.).
The row path
+ // preserves these via JoinedRow, so falling back avoids silently
nulling them out, OR
+ // - DESCRIPTOR blob mode is requested with BLOB columns present.
The descriptor
+ // rewrite (dataโnull + synthesized/passthrough reference) lives
in BlobDescriptorTransform
+ // which only the row path applies; the batch path would surface
Lance's raw
+ // {position, size} struct in `data`, breaking the DESCRIPTOR-mode
contract.
+ val hasTypeChanges = !implicitTypeChangeInfo.isEmpty
+ val partitionTypesBatchSupported =
+ partitionSchema.forall(f =>
isPartitionTypeSupportedForBatch(f.dataType))
+ val descriptorBlobReadNeedsRowPath =
+ blobMode == BlobReadMode.DESCRIPTOR && !blobFieldNames.isEmpty
+ if (enableVectorizedReader && !hasTypeChanges &&
partitionTypesBatchSupported
Review Comment:
๐ค Could this fall back to row mode after Spark already committed to columnar
output? `supportBatch` only checks `!internalSchemaOpt.isPresent`, but
`hasTypeChanges` can still be true at runtime when a file's schema differs in
type from the required schema (e.g., older base files written before a widening
reconciliation, or `spark.read.schema(wider)` on a table with narrower file
types). In that case the planner expects `ColumnarBatch`, but `readRows`
returns `InternalRow` โ ClassCastException downstream. Worth confirming whether
this can be reached via Hudi-managed Lance writes, and if so, gating batch mode
more conservatively at plan time (or adding type-cast support to `readBatch`).
<sub><i>- AI-generated; verify before applying. React ๐/๐ to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]