nsivabalan commented on code in PR #5430:
URL: https://github.com/apache/hudi/pull/5430#discussion_r924355021


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala:
##########
@@ -123,6 +147,61 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
       HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles)
     }.toList
   }
+
+  protected def createMergeOnReadBaseFileReaders(partitionSchema: StructType,
+                                                 dataSchema: HoodieTableSchema,
+                                                 requiredDataSchema: 
HoodieTableSchema,
+                                                 requestedColumns: 
Array[String],
+                                                 filters: Array[Filter]): 
(BaseFileReader, BaseFileReader) = {
+    val requiredSchemaFileReaderMerging = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      partitionSchema = partitionSchema,
+      dataSchema = dataSchema,
+      requiredSchema = requiredDataSchema,
+      filters = filters,
+      options = optParams,
+      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+      //       to configure Parquet reader appropriately
+      hadoopConf = embedInternalSchema(new Configuration(conf), 
requiredDataSchema.internalSchema)
+    )
+
+    // Check whether fields required for merging were also requested to be 
fetched
+    // by the query:
+    //    - In case they were, there's no optimization we could apply here (we 
will have
+    //    to fetch such fields)
+    //    - In case they were not, we will provide 2 separate file-readers
+    //        a) One which would be applied to file-groups w/ delta-logs 
(merging)
+    //        b) One which would be applied to file-groups w/ no delta-logs or
+    //           in case query-mode is skipping merging
+    val requiredColumns = 
mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName)
+    if (requiredColumns.forall(requestedColumns.contains)) {
+      (requiredSchemaFileReaderMerging, requiredSchemaFileReaderMerging)
+    } else {
+      val prunedRequiredSchema = {
+        val superfluousColumnNames = 
requiredColumns.filterNot(requestedColumns.contains)
+        val prunedStructSchema =
+          StructType(requiredDataSchema.structTypeSchema.fields

Review Comment:
   can you verify if these changes works well for diff interplays between root 
level and nested fields as well. i.e.
   a. required col refers to a nested field. and required fields refers to root 
level 
   b. required col refers to root level. required field refers to root level 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to