yihua commented on code in PR #13223:
URL: https://github.com/apache/hudi/pull/13223#discussion_r2065586829


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -128,9 +131,18 @@ object HoodieInternalRowUtils {
 
   private[sql] def genUnsafeRowWriter(prevSchema: StructType,
                                       newSchema: StructType,
-                                      renamedColumnsMap: JMap[String, 
String]): UnsafeRowWriter = {
-    val writer = newWriterRenaming(prevSchema, newSchema, renamedColumnsMap, 
new JArrayDeque[String]())
+                                      renamedColumnsMap: JMap[String, String],
+                                      updatedValuesMap: JMap[Integer, 
Object]): UnsafeRowWriter = {
     val unsafeProjection = generateUnsafeProjection(newSchema, newSchema)
+    if (prevSchema.equals(newSchema) && renamedColumnsMap.isEmpty && 
updatedValuesMap.isEmpty) {
+      oldRow => unsafeProjection(oldRow)

Review Comment:
   In Scala, should this use `return` statement explicitly so the writer is 
returned?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -165,10 +172,10 @@ class 
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
 
       //If we need to do position based merging with log files we will leave 
the row index column at the end
       val dataProjection = if (getHasLogFiles && 
getShouldMergeUseRecordPosition) {
-        getIdentityProjection
+        getBootstrapProjection(dataRequiredSchema, dataRequiredSchema, 
partitionFields, partitionValues)
       } else {
-        projectRecord(dataRequiredSchema,
-          HoodieAvroUtils.removeFields(dataRequiredSchema, rowIndexColumn))
+        getBootstrapProjection(dataRequiredSchema,
+          HoodieAvroUtils.removeFields(dataRequiredSchema, rowIndexColumn), 
partitionFields, partitionValues)
       }

Review Comment:
   Once the partition field values are properly read at the layer of file group 
reader, we should revisit and possibly get rid of the same partition field 
value projection in the Spark file format class 
(`HoodieFileGroupReaderBasedParquetFileFormat`):
   ```
                 val reader = new 
HoodieFileGroupReader[InternalRow](readerContext, new 
                   HoodieHadoopStorage(metaClient.getBasePath, storageConf), 
tablePath, queryTimestamp,
                   fileSlice, dataAvroSchema, requestedAvroSchema, 
internalSchemaOpt, metaClient, props, file.start, baseFileLength, 
shouldUseRecordPosition, false)
                 reader.initRecordIterators()
                 // Append partition values to rows and project to output schema
                 appendPartitionAndProject(
                   reader.getClosableIterator,
                   requestedSchema,
                   remainingPartitionSchema,
                   outputSchema,
                   fileSliceMapping.getPartitionValues,
                   fixedPartitionIndexes)
   ```



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -249,7 +257,7 @@ class 
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
               sparkAdapter.makeColumnarBatch(vecs, s.numRows())

Review Comment:
   Should a `TODO` be left here to support columnar batch merging with 
partition field projection in the future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to