yihua commented on code in PR #13223:
URL: https://github.com/apache/hudi/pull/13223#discussion_r2065586829
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -128,9 +131,18 @@ object HoodieInternalRowUtils {
private[sql] def genUnsafeRowWriter(prevSchema: StructType,
newSchema: StructType,
- renamedColumnsMap: JMap[String,
String]): UnsafeRowWriter = {
- val writer = newWriterRenaming(prevSchema, newSchema, renamedColumnsMap,
new JArrayDeque[String]())
+ renamedColumnsMap: JMap[String, String],
+ updatedValuesMap: JMap[Integer,
Object]): UnsafeRowWriter = {
val unsafeProjection = generateUnsafeProjection(newSchema, newSchema)
+ if (prevSchema.equals(newSchema) && renamedColumnsMap.isEmpty &&
updatedValuesMap.isEmpty) {
+ oldRow => unsafeProjection(oldRow)
Review Comment:
In Scala, should this use `return` statement explicitly so the writer is
returned?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -165,10 +172,10 @@ class
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
//If we need to do position based merging with log files we will leave
the row index column at the end
val dataProjection = if (getHasLogFiles &&
getShouldMergeUseRecordPosition) {
- getIdentityProjection
+ getBootstrapProjection(dataRequiredSchema, dataRequiredSchema,
partitionFields, partitionValues)
} else {
- projectRecord(dataRequiredSchema,
- HoodieAvroUtils.removeFields(dataRequiredSchema, rowIndexColumn))
+ getBootstrapProjection(dataRequiredSchema,
+ HoodieAvroUtils.removeFields(dataRequiredSchema, rowIndexColumn),
partitionFields, partitionValues)
}
Review Comment:
Once the partition field values are properly read at the layer of file group
reader, we should revisit and possibly get rid of the same partition field
value projection in the Spark file format class
(`HoodieFileGroupReaderBasedParquetFileFormat`):
```
val reader = new
HoodieFileGroupReader[InternalRow](readerContext, new
HoodieHadoopStorage(metaClient.getBasePath, storageConf),
tablePath, queryTimestamp,
fileSlice, dataAvroSchema, requestedAvroSchema,
internalSchemaOpt, metaClient, props, file.start, baseFileLength,
shouldUseRecordPosition, false)
reader.initRecordIterators()
// Append partition values to rows and project to output schema
appendPartitionAndProject(
reader.getClosableIterator,
requestedSchema,
remainingPartitionSchema,
outputSchema,
fileSliceMapping.getPartitionValues,
fixedPartitionIndexes)
```
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -249,7 +257,7 @@ class
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
sparkAdapter.makeColumnarBatch(vecs, s.numRows())
Review Comment:
Should a `TODO` be left here to support columnar batch merging with
partition field projection in the future?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]