This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d498d7b7e2 [HUDI-7047] Fix issues in new file format for incremental 
queries (#10009)
0d498d7b7e2 is described below

commit 0d498d7b7e2bbc2dfcad97b22a853936eb5038c3
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Nov 8 05:48:46 2023 -0500

    [HUDI-7047] Fix issues in new file format for incremental queries (#10009)
    
    - In HoodieIncrementalFileIndex Seq.empty won't be returned
       unless the rest of the method is in an else block.
    - Disable vectorized reader for incremental
    - testPrunePartitionForTimestampBasedKeyGenerator:
       if the date is already in the output format then we don't need to try 
and convert it
       add partition pruning to the query
    - TestIncrementalReadWithFullTableScan: There are schema issues
       with impl that were fixed by correctly setting mandatory fields
       and projecting to the output schema.
---
 .../scala/org/apache/hudi/HoodieCDCFileIndex.scala |  5 +-
 .../scala/org/apache/hudi/HoodieFileIndex.scala    | 18 ++++-
 .../hudi/HoodieHadoopFsRelationFactory.scala       | 16 +++-
 .../apache/hudi/HoodieIncrementalFileIndex.scala   | 87 +++++++++++-----------
 .../hudi/MergeOnReadIncrementalRelation.scala      | 14 +++-
 .../apache/hudi/SparkHoodieTableFileIndex.scala    |  2 +-
 .../datasources/HoodieMultipleBaseFileFormat.scala |  2 +-
 ...odieFileGroupReaderBasedParquetFileFormat.scala |  2 +-
 .../parquet/NewHoodieParquetFileFormat.scala       | 21 ++++--
 .../apache/hudi/functional/TestMORDataSource.scala |  4 +-
 10 files changed, 102 insertions(+), 69 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
index cb46b19b88b..0fe3dcd2b51 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
@@ -37,9 +37,10 @@ class HoodieCDCFileIndex (override val spark: SparkSession,
                           override val schemaSpec: Option[StructType],
                           override val options: Map[String, String],
                           @transient override val fileStatusCache: 
FileStatusCache = NoopCache,
-                          override val includeLogFiles: Boolean)
+                          override val includeLogFiles: Boolean,
+                          override val shouldEmbedFileSlices: Boolean)
   extends HoodieIncrementalFileIndex(
-    spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles
+    spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles, 
shouldEmbedFileSlices
   ) with FileIndex {
   val cdcRelation: CDCRelation = CDCRelation.getCDCRelation(spark.sqlContext, 
metaClient, options)
   val cdcExtractor: HoodieCDCExtractor = cdcRelation.cdcExtractor
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index aaefd715fa3..2767b6982cd 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -157,13 +157,13 @@ case class HoodieFileIndex(spark: SparkSession,
           val baseFileStatusesAndLogFileOnly: Seq[FileStatus] = 
fileSlices.map(slice => {
             if (slice.getBaseFile.isPresent) {
               slice.getBaseFile.get().getFileStatus
-            } else if (slice.getLogFiles.findAny().isPresent) {
+            } else if (includeLogFiles && 
slice.getLogFiles.findAny().isPresent) {
               slice.getLogFiles.findAny().get().getFileStatus
             } else {
               null
             }
           }).filter(slice => slice != null)
-          val c = fileSlices.filter(f => f.getLogFiles.findAny().isPresent
+          val c = fileSlices.filter(f => (includeLogFiles && 
f.getLogFiles.findAny().isPresent)
             || (f.getBaseFile.isPresent && 
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
             foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> 
f) }
           if (c.nonEmpty) {
@@ -497,8 +497,18 @@ object HoodieFileIndex extends Logging {
           partitionFilters.toArray.map {
             _.transformDown {
               case Literal(value, dataType) if 
dataType.isInstanceOf[StringType] =>
-                val converted = 
outDateFormat.format(inDateFormat.parse(value.toString))
-                Literal(UTF8String.fromString(converted), StringType)
+                try {
+                  val converted = 
outDateFormat.format(inDateFormat.parse(value.toString))
+                  Literal(UTF8String.fromString(converted), StringType)
+                } catch {
+                  case _: java.text.ParseException =>
+                    try {
+                      outDateFormat.parse(value.toString)
+                    } catch {
+                      case e: Exception => throw new 
HoodieException("Partition filter for TimestampKeyGenerator cannot be converted 
to format " + outDateFormat.toString, e)
+                    }
+                    Literal(UTF8String.fromString(value.toString), StringType)
+                }
             }
           }
         } catch {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index 7a3ea7483fd..50249d87d97 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -281,8 +281,10 @@ class 
HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val sqlContex
                                                           isBootstrap: Boolean)
   extends HoodieMergeOnReadSnapshotHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
 
+  override val mandatoryFields: Seq[String] = 
Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ mandatoryFieldsForMerging
+
   override val fileIndex = new HoodieIncrementalFileIndex(
-    sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), true)
+    sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), true, true)
 
   override val fileGroupReaderBasedFileFormat = new 
HoodieFileGroupReaderBasedParquetFileFormat(
     tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, 
internalSchemaOpt),
@@ -306,6 +308,9 @@ class 
HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val sqlContext:
                                                         override val 
schemaSpec: Option[StructType],
                                                         isBootstrap: Boolean)
   extends HoodieMergeOnReadSnapshotHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+
+  override val mandatoryFields: Seq[String] = Seq.empty
+
   override val fileIndex: HoodieFileIndex = HoodieFileIndex(
     sparkSession,
     metaClient,
@@ -335,8 +340,11 @@ class 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val sqlContex
                                                           isBootstrap: Boolean)
   extends HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
 
+  override val mandatoryFields: Seq[String] = 
Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
+    preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
+
   override val fileIndex = new HoodieIncrementalFileIndex(
-    sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), false)
+    sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), false, isBootstrap)
 
   override val fileGroupReaderBasedFileFormat = new 
HoodieFileGroupReaderBasedParquetFileFormat(
     tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, 
internalSchemaOpt),
@@ -361,7 +369,7 @@ class HoodieMergeOnReadCDCHadoopFsRelationFactory(override 
val sqlContext: SQLCo
                                                    isBootstrap: Boolean)
   extends HoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
   override val fileIndex = new HoodieCDCFileIndex(
-    sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), true)
+    sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), true, true)
 }
 
 class HoodieCopyOnWriteCDCHadoopFsRelationFactory(override val sqlContext: 
SQLContext,
@@ -371,7 +379,7 @@ class HoodieCopyOnWriteCDCHadoopFsRelationFactory(override 
val sqlContext: SQLCo
                                                   isBootstrap: Boolean)
   extends HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
   override val fileIndex = new HoodieCDCFileIndex(
-    sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), false)
+    sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), false, isBootstrap)
 }
 
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
index b977c51ab67..982808fdc42 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
@@ -36,9 +36,10 @@ class HoodieIncrementalFileIndex(override val spark: 
SparkSession,
                                  override val schemaSpec: Option[StructType],
                                  override val options: Map[String, String],
                                  @transient override val fileStatusCache: 
FileStatusCache = NoopCache,
-                                 override val includeLogFiles: Boolean)
+                                 override val includeLogFiles: Boolean,
+                                 override val shouldEmbedFileSlices: Boolean)
   extends HoodieFileIndex(
-    spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles
+    spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles, 
shouldEmbedFileSlices
   ) with FileIndex {
   val mergeOnReadIncrementalRelation: MergeOnReadIncrementalRelation = 
MergeOnReadIncrementalRelation(
     spark.sqlContext, options, metaClient, schemaSpec, schemaSpec)
@@ -47,52 +48,52 @@ class HoodieIncrementalFileIndex(override val spark: 
SparkSession,
     val fileSlices = 
mergeOnReadIncrementalRelation.listFileSplits(partitionFilters, dataFilters)
     if (fileSlices.isEmpty) {
       Seq.empty
-    }
-
-    val prunedPartitionsAndFilteredFileSlices = fileSlices.map {
-      case (partitionValues, fileSlices) =>
-        if (shouldEmbedFileSlices) {
-          val baseFileStatusesAndLogFileOnly: Seq[FileStatus] = 
fileSlices.map(slice => {
-            if (slice.getBaseFile.isPresent) {
-              slice.getBaseFile.get().getFileStatus
-            } else if (slice.getLogFiles.findAny().isPresent) {
-              slice.getLogFiles.findAny().get().getFileStatus
+    } else {
+      val prunedPartitionsAndFilteredFileSlices = fileSlices.map {
+        case (partitionValues, fileSlices) =>
+          if (shouldEmbedFileSlices) {
+            val baseFileStatusesAndLogFileOnly: Seq[FileStatus] = 
fileSlices.map(slice => {
+              if (slice.getBaseFile.isPresent) {
+                slice.getBaseFile.get().getFileStatus
+              } else if (slice.getLogFiles.findAny().isPresent) {
+                slice.getLogFiles.findAny().get().getFileStatus
+              } else {
+                null
+              }
+            }).filter(slice => slice != null)
+            val c = fileSlices.filter(f => f.getLogFiles.findAny().isPresent
+              || (f.getBaseFile.isPresent && 
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
+              foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId 
-> f) }
+            if (c.nonEmpty) {
+              PartitionDirectory(new 
HoodiePartitionFileSliceMapping(partitionValues, c), 
baseFileStatusesAndLogFileOnly)
             } else {
-              null
+              PartitionDirectory(partitionValues, 
baseFileStatusesAndLogFileOnly)
             }
-          }).filter(slice => slice != null)
-          val c = fileSlices.filter(f => f.getLogFiles.findAny().isPresent
-              || (f.getBaseFile.isPresent && 
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
-            foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> 
f) }
-          if (c.nonEmpty) {
-            PartitionDirectory(new 
HoodiePartitionFileSliceMapping(partitionValues, c), 
baseFileStatusesAndLogFileOnly)
           } else {
-            PartitionDirectory(partitionValues, baseFileStatusesAndLogFileOnly)
+            val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
+              val baseFileStatusOpt = 
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+              val logFilesStatus = if (includeLogFiles) {
+                
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile, 
FileStatus](lf => lf.getFileStatus))
+              } else {
+                java.util.stream.Stream.empty()
+              }
+              val files = 
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
+              baseFileStatusOpt.foreach(f => files.append(f))
+              files
+            })
+            PartitionDirectory(partitionValues, allCandidateFiles)
           }
-        } else {
-          val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
-            val baseFileStatusOpt = 
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
-            val logFilesStatus = if (includeLogFiles) {
-              
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile, 
FileStatus](lf => lf.getFileStatus))
-            } else {
-              java.util.stream.Stream.empty()
-            }
-            val files = 
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
-            baseFileStatusOpt.foreach(f => files.append(f))
-            files
-          })
-          PartitionDirectory(partitionValues, allCandidateFiles)
-        }
-    }.toSeq
+      }.toSeq
 
-    hasPushedDownPartitionPredicates = true
-    if (shouldReadAsPartitionedTable()) {
-      prunedPartitionsAndFilteredFileSlices
-    } else if (shouldEmbedFileSlices) {
-      assert(partitionSchema.isEmpty)
-      prunedPartitionsAndFilteredFileSlices
-    } else {
-      Seq(PartitionDirectory(InternalRow.empty, 
prunedPartitionsAndFilteredFileSlices.flatMap(_.files)))
+      hasPushedDownPartitionPredicates = true
+      if (shouldReadAsPartitionedTable()) {
+        prunedPartitionsAndFilteredFileSlices
+      } else if (shouldEmbedFileSlices) {
+        assert(partitionSchema.isEmpty)
+        prunedPartitionsAndFilteredFileSlices
+      } else {
+        Seq(PartitionDirectory(InternalRow.empty, 
prunedPartitionsAndFilteredFileSlices.flatMap(_.files)))
+      }
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 44d937f22ad..e37a1737d27 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -131,9 +131,11 @@ case class MergeOnReadIncrementalRelation(override val 
sqlContext: SQLContext,
         val fsView = new HoodieTableFileSystemView(metaClient, timeline, 
affectedFilesInCommits)
         val modifiedPartitions = getWritePartitionPaths(commitsMetadata)
 
-        modifiedPartitions.asScala.flatMap { relativePartitionPath =>
-          fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, 
latestCommit).iterator().asScala
-        }.toSeq
+        
fileIndex.listMatchingPartitionPaths(HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient,
 partitionFilters))
+          .map(p => p.path).filter(p => modifiedPartitions.contains(p))
+          .flatMap { relativePartitionPath =>
+            fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, 
latestCommit).iterator().asScala
+          }
       }
       filterFileSlices(fileSlices, globPattern)
     }
@@ -148,7 +150,11 @@ case class MergeOnReadIncrementalRelation(override val 
sqlContext: SQLContext,
   }
 
   def getRequiredFilters: Seq[Filter] = {
-    incrementalSpanRecordFilters
+    if (includedCommits.isEmpty) {
+      Seq.empty
+    } else {
+      incrementalSpanRecordFilters
+    }
   }
 
   private def filterFileSlices(fileSlices: Seq[FileSlice], pathGlobPattern: 
String): Seq[FileSlice] = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index c9a69a5210e..828c8df41de 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -201,7 +201,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
    * @param predicates The filter condition.
    * @return The pruned partition paths.
    */
-  protected def listMatchingPartitionPaths(predicates: Seq[Expression]): 
Seq[PartitionPath] = {
+  def listMatchingPartitionPaths(predicates: Seq[Expression]): 
Seq[PartitionPath] = {
     val resolve = spark.sessionState.analyzer.resolver
     val partitionColumnNames = getPartitionColumns
     val partitionPruningPredicates = predicates.filter {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
index 0e657600e0d..a8a3fa94c13 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
@@ -86,7 +86,7 @@ class HoodieMultipleBaseFileFormat(tableState: 
Broadcast[HoodieTableState],
     if (!supportBatchCalled) {
       supportBatchCalled = true
       supportBatchResult =
-        !isMOR && parquetFormat.supportBatch(sparkSession, schema) && 
orcFormat.supportBatch(sparkSession, schema)
+        !isMOR && !isIncremental && parquetFormat.supportBatch(sparkSession, 
schema) && orcFormat.supportBatch(sparkSession, schema)
     }
     supportBatchResult
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 3fb80509b61..495569b2ce8 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -76,7 +76,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: 
HoodieTableState,
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
     if (!supportBatchCalled) {
       supportBatchCalled = true
-      supportBatchResult = !isMOR && super.supportBatch(sparkSession, schema)
+      supportBatchResult = !isMOR && !isIncremental && 
super.supportBatch(sparkSession, schema)
     }
     supportBatchResult
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
index af3cdf715e8..4fe921bc440 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -74,7 +74,7 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
     if (!supportBatchCalled) {
       supportBatchCalled = true
-      supportBatchResult = !isMOR && super.supportBatch(sparkSession, schema)
+      supportBatchResult = !isIncremental && !isMOR && 
super.supportBatch(sparkSession, schema)
     }
     supportBatchResult
   }
@@ -89,9 +89,7 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
 
     val outputSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
 
-    val requiredSchemaWithMandatory = if (isIncremental) {
-      StructType(dataSchema.toArray ++ partitionSchema.fields)
-    } else if (!isMOR || 
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
+    val requiredSchemaWithMandatory = if (!isMOR || 
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
       //add mandatory fields to required schema
       val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
       for (field <- mandatoryFields) {
@@ -142,7 +140,7 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
                 val hoodieBaseFile = fileSlice.getBaseFile.get()
                 val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
                 val partitionValues = fileSliceMapping.getPartitionValues
-                val logFiles = getLogFilesFromSlice(fileSlice)
+                val logFiles = if (isMOR) getLogFilesFromSlice(fileSlice) else 
List.empty
                 if (requiredSchemaWithMandatory.isEmpty) {
                   val baseFile = createPartitionedFile(partitionValues, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
                   baseFileReader(baseFile)
@@ -169,9 +167,13 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
                     //baseFileReader(baseFile)
                   }
                 }
+              case _ if isIncremental =>
+                projectSchema(baseFileReader(file), 
StructType(requiredSchemaWithMandatory.fields ++ partitionSchema.fields), 
outputSchema)
               case _ => baseFileReader(file)
             }
           }
+        case _ if isIncremental =>
+          projectSchema(baseFileReader(file), 
StructType(requiredSchemaWithMandatory.fields ++ partitionSchema.fields), 
outputSchema)
         case _ => baseFileReader(file)
       }
     }
@@ -190,8 +192,13 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
     PartitionedFile => Iterator[InternalRow]) = {
 
     //file reader when you just read a hudi parquet file and don't do any 
merging
-    val baseFileReader = super.buildReaderWithPartitionValues(sparkSession, 
dataSchema, partitionSchema, requiredSchema,
-      filters ++ requiredFilters, options, new Configuration(hadoopConf))
+    val baseFileReader = if (isIncremental) {
+      super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema, requiredSchemaWithMandatory,
+        filters ++ requiredFilters, options, new Configuration(hadoopConf))
+    } else {
+      super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema, requiredSchema,
+        filters ++ requiredFilters, options, new Configuration(hadoopConf))
+    }
 
     //file reader for reading a hudi base file that needs to be merged with 
log files
     val preMergeBaseFileReader = if (isMOR) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 4625c9c7bb6..1cfd0e7fbba 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -1207,8 +1207,8 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commit2Time)
       .option(DataSourceReadOptions.END_INSTANTTIME.key, commit3Time)
       .load(basePath)
-    assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 
0)
-    assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count, 
20)
+    assertEquals(0, incrementalQueryRes.where("partition = 
'2022-01-01'").count)
+    assertEquals(20, incrementalQueryRes.where("partition = 
'2022-01-02'").count)
   }
 
   /**

Reply via email to