This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dcb54fc7fdd [HUDI-6790] Support incremental/CDC queries using 
HadoopFsRelation (#9888)
dcb54fc7fdd is described below

commit dcb54fc7fdd446051718273d4a2ff0a70f59054c
Author: Lin Liu <[email protected]>
AuthorDate: Tue Nov 7 11:42:10 2023 -0800

    [HUDI-6790] Support incremental/CDC queries using HadoopFsRelation (#9888)
---
 .../main/scala/org/apache/hudi/DefaultSource.scala |  73 ++-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |   3 +
 .../scala/org/apache/hudi/HoodieCDCFileIndex.scala |  75 +++
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |  11 +-
 .../hudi/HoodieHadoopFsRelationFactory.scala       | 377 ++++++++++++++
 .../apache/hudi/HoodieIncrementalFileIndex.scala   | 120 +++++
 .../hudi/HoodiePartitionCDCFileGroupMapping.scala  |  39 ++
 .../hudi/HoodiePartitionFileSliceMapping.scala     |  37 ++
 .../org/apache/hudi/HoodiePartitionValues.scala    | 107 ++++
 .../apache/hudi/HoodieSparkFileFormatUtils.scala   |  13 +-
 .../hudi/MergeOnReadIncrementalRelation.scala      |  34 +-
 .../apache/hudi/PartitionFileSliceMapping.scala    |  76 ---
 .../org/apache/hudi/cdc/CDCFileGroupIterator.scala | 558 +++++++++++++++++++++
 .../datasources/HoodieMultipleBaseFileFormat.scala |  25 +-
 ...odieFileGroupReaderBasedParquetFileFormat.scala |  80 ++-
 .../parquet/NewHoodieParquetFileFormat.scala       |  25 +-
 16 files changed, 1505 insertions(+), 148 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 10b77d4b91c..7bdaa9609ef 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -235,13 +235,20 @@ object DefaultSource {
       Option(schema)
     }
 
+    val useNewPaquetFileFormat = parameters.getOrElse(
+      USE_NEW_HUDI_PARQUET_FILE_FORMAT.key, 
USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean
     if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() 
== 0) {
       new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters, 
Some(schema)))
     } else if (isCdcQuery) {
-      CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
+      if (useNewPaquetFileFormat) {
+          new HoodieMergeOnReadCDCHadoopFsRelationFactory(
+            sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
+      } else {
+        CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
+      }
     } else {
       lazy val fileFormatUtils = if ((isMultipleBaseFileFormatsEnabled && 
!isBootstrappedTable)
-        || (parameters.getOrElse(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key, 
USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean
+        || (useNewPaquetFileFormat
         && (globPaths == null || globPaths.isEmpty)
         && parameters.getOrElse(REALTIME_MERGE.key(), 
REALTIME_MERGE.defaultValue())
         .equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL))) {
@@ -262,33 +269,67 @@ object DefaultSource {
         case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
              (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
              (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
-          resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
+          if (fileFormatUtils.isDefined) {
+            new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
+          } else {
+            resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
+          }
 
-        case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-          new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
+        case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, false) =>
+          if (fileFormatUtils.isDefined) {
+            new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
+          } else {
+            new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
+          }
 
-        case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
-          if (fileFormatUtils.isEmpty) {
-            new MergeOnReadSnapshotRelation(sqlContext, parameters, 
metaClient, globPaths, userSchema)
+        case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, true) =>
+          if (fileFormatUtils.isDefined) {
+            new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
true).build()
           } else {
-            fileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap 
= false)
+            new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
           }
 
-        case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-          new MergeOnReadIncrementalRelation(sqlContext, parameters, 
metaClient, userSchema)
+        case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
+          if (fileFormatUtils.isDefined) {
+            new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
+          } else {
+            new MergeOnReadSnapshotRelation(sqlContext, parameters, 
metaClient, globPaths, userSchema)
+          }
 
         case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
-          if (fileFormatUtils.isEmpty) {
+          if (fileFormatUtils.isDefined) {
+            new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
+          } else {
             new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, 
metaClient, parameters)
+          }
+
+        case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, true) =>
+          if (fileFormatUtils.isDefined) {
+            new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
true).build()
           } else {
-            fileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap 
= true)
+            MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, 
userSchema)
+          }
+
+        case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, false) =>
+          if (fileFormatUtils.isDefined) {
+            new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
+          } else {
+            MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, 
userSchema)
           }
 
         case (_, _, true) =>
-          if (fileFormatUtils.isEmpty) {
-            resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
+          if (fileFormatUtils.isDefined) {
+            new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
true).build()
           } else {
-            fileFormatUtils.get.getHadoopFsRelation(isMOR = false, isBootstrap 
= true)
+            resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
           }
 
         case (_, _, _) =>
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index c791e8417ca..78c5cc4ca47 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -480,6 +480,9 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
   protected def getPartitionColumnsAsInternalRow(file: FileStatus): 
InternalRow =
     getPartitionColumnsAsInternalRowInternal(file, metaClient.getBasePathV2, 
shouldExtractPartitionValuesFromPartitionPath)
 
+  protected def getPartitionColumnValuesAsInternalRow(file: FileStatus): 
InternalRow =
+    getPartitionColumnsAsInternalRowInternal(file, metaClient.getBasePathV2, 
extractPartitionValuesFromPartitionPath = true)
+
   protected def getPartitionColumnsAsInternalRowInternal(file: FileStatus, 
basePath: Path,
                                                          
extractPartitionValuesFromPartitionPath: Boolean): InternalRow = {
     if (extractPartitionValuesFromPartitionPath) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
new file mode 100644
index 00000000000..cb46b19b88b
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.common.model.HoodieFileGroupId
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.cdc.HoodieCDCExtractor
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
GenericInternalRow}
+import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, 
NoopCache, PartitionDirectory}
+import org.apache.spark.sql.types.StructType
+
+import scala.jdk.CollectionConverters.{asScalaBufferConverter, 
mapAsScalaMapConverter}
+
+class HoodieCDCFileIndex (override val spark: SparkSession,
+                          override val metaClient: HoodieTableMetaClient,
+                          override val schemaSpec: Option[StructType],
+                          override val options: Map[String, String],
+                          @transient override val fileStatusCache: 
FileStatusCache = NoopCache,
+                          override val includeLogFiles: Boolean)
+  extends HoodieIncrementalFileIndex(
+    spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles
+  ) with FileIndex {
+  val cdcRelation: CDCRelation = CDCRelation.getCDCRelation(spark.sqlContext, 
metaClient, options)
+  val cdcExtractor: HoodieCDCExtractor = cdcRelation.cdcExtractor
+
+  override def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[PartitionDirectory] = {
+    val partitionToFileGroups = 
cdcExtractor.extractCDCFileSplits().asScala.groupBy(_._1.getPartitionPath).toSeq
+    partitionToFileGroups.map {
+      case (partitionPath, fileGroups) =>
+        val fileGroupIds: List[FileStatus] = fileGroups.map { fileGroup => {
+          // We create a fake FileStatus to wrap the information of 
HoodieFileGroupId, which are used
+          // later to retrieve the corresponding CDC file group splits.
+          val fileGroupId: HoodieFileGroupId = fileGroup._1
+          new FileStatus(0, true, 0, 0, 0,
+            0, null, "", "", null,
+            new Path(fileGroupId.getPartitionPath, fileGroupId.getFileId))
+        }}.toList
+        val partitionValues: InternalRow = new 
GenericInternalRow(doParsePartitionColumnValues(
+          metaClient.getTableConfig.getPartitionFields.get(), 
partitionPath).asInstanceOf[Array[Any]])
+        PartitionDirectory(
+          new HoodiePartitionCDCFileGroupMapping(
+            partitionValues, fileGroups.map(kv => kv._1 -> 
kv._2.asScala.toList).toMap),
+          fileGroupIds
+        )
+    }
+  }
+
+  override def inputFiles: Array[String] = {
+    cdcExtractor.extractCDCFileSplits().asScala.map { fileGroupSplit =>
+      val fileGroupId = fileGroupSplit._1
+      new Path(fileGroupId.getPartitionPath, fileGroupId.getFileId).toString
+    }.toArray
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 28e1b54473e..aaefd715fa3 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -76,7 +76,8 @@ case class HoodieFileIndex(spark: SparkSession,
                            schemaSpec: Option[StructType],
                            options: Map[String, String],
                            @transient fileStatusCache: FileStatusCache = 
NoopCache,
-                           includeLogFiles: Boolean = false)
+                           includeLogFiles: Boolean = false,
+                           shouldEmbedFileSlices: Boolean = false)
   extends SparkHoodieTableFileIndex(
     spark = spark,
     metaClient = metaClient,
@@ -88,7 +89,7 @@ case class HoodieFileIndex(spark: SparkSession,
   )
     with FileIndex {
 
-  @transient private var hasPushedDownPartitionPredicates: Boolean = false
+  @transient protected var hasPushedDownPartitionPredicates: Boolean = false
 
   /**
    * NOTE: [[ColumnStatsIndexSupport]] is a transient state, since it's only 
relevant while logical plan
@@ -110,8 +111,6 @@ case class HoodieFileIndex(spark: SparkSession,
 
   override def rootPaths: Seq[Path] = getQueryPaths.asScala
 
-  var shouldEmbedFileSlices: Boolean = false
-
   /**
    * Returns the FileStatus for all the base files (excluding log files). This 
should be used only for
    * cases where Spark directly fetches the list of files via HoodieFileIndex 
or for read optimized query logic
@@ -168,7 +167,7 @@ case class HoodieFileIndex(spark: SparkSession,
             || (f.getBaseFile.isPresent && 
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
             foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> 
f) }
           if (c.nonEmpty) {
-            PartitionDirectory(new 
PartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), c), 
baseFileStatusesAndLogFileOnly)
+            PartitionDirectory(new 
HoodiePartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), 
c), baseFileStatusesAndLogFileOnly)
           } else {
             PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), 
baseFileStatusesAndLogFileOnly)
           }
@@ -293,7 +292,7 @@ case class HoodieFileIndex(spark: SparkSession,
    * In the fast bootstrap read code path, it gets the file status for the 
bootstrap base file instead of
    * skeleton file. Returns file status for the base file if available.
    */
-  private def getBaseFileStatus(baseFileOpt: Option[HoodieBaseFile]): 
Option[FileStatus] = {
+  protected def getBaseFileStatus(baseFileOpt: Option[HoodieBaseFile]): 
Option[FileStatus] = {
     baseFileOpt.map(baseFile => {
       if (shouldFastBootstrap) {
         if (baseFile.getBootstrapBaseFile.isPresent) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
new file mode 100644
index 00000000000..7a3ea7483fd
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, 
isSchemaEvolutionEnabledOnRead}
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.HoodieFileIndex.getConfigProperties
+import 
org.apache.hudi.common.config.HoodieMetadataConfig.{DEFAULT_METADATA_ENABLE_FOR_READERS,
 ENABLE}
+import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig, 
HoodieReaderConfig, TypedProperties}
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
+import org.apache.hudi.metadata.HoodieTableMetadataUtil
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieFileGroupReaderBasedParquetFileFormat,
 NewHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, 
FileStatusCache, HadoopFsRelation, HoodieMultipleBaseFileFormat}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+trait HoodieHadoopFsRelationFactory {
+  def build(): HadoopFsRelation
+  def buildFileIndex(): FileIndex
+  def buildFileFormat(): FileFormat
+  def buildPartitionSchema(): StructType
+  def buildDataSchema(): StructType
+  def buildBucketSpec(): Option[BucketSpec]
+  def buildOptions(): Map[String, String]
+}
+
+abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext,
+                                                 val metaClient: 
HoodieTableMetaClient,
+                                                 val options: Map[String, 
String],
+                                                 val schemaSpec: 
Option[StructType]
+                                                ) extends SparkAdapterSupport 
with HoodieHadoopFsRelationFactory {
+  protected lazy val sparkSession: SparkSession = sqlContext.sparkSession
+  protected lazy val optParams: Map[String, String] = options.filter(kv => 
!kv._1.equals(DATA_QUERIES_ONLY.key()))
+  protected lazy val hadoopConfig: Configuration = new 
Configuration(sqlContext.sparkContext.hadoopConfiguration)
+  protected lazy val jobConf = new JobConf(hadoopConfig)
+
+  protected lazy val resolver: Resolver = 
sparkSession.sessionState.analyzer.resolver
+  protected lazy val metaFieldNames: Set[String] = 
HoodieRecord.HOODIE_META_COLUMNS.asScala.toSet
+
+  protected lazy val tableName: String = metaClient.getTableConfig.getTableName
+  protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig
+  protected lazy val basePath: Path = metaClient.getBasePathV2
+  protected lazy val partitionColumns: Array[String] = 
tableConfig.getPartitionFields.orElse(Array.empty)
+
+  protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: 
Option[InternalSchema]) = {
+    val schemaResolver = new TableSchemaResolver(metaClient)
+    val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams, 
sparkSession)) {
+      None
+    } else {
+      Try {
+        
specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata)
+          .getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata)
+      } match {
+        case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
+        case Failure(e) =>
+          None
+      }
+    }
+
+    val (name, namespace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
+    val avroSchema = internalSchemaOpt.map { is =>
+      AvroInternalSchemaConverter.convert(is, namespace + "." + name)
+    } orElse {
+      specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
+    } orElse {
+      schemaSpec.map(s => convertToAvroSchema(s, tableName))
+    } getOrElse {
+      Try(schemaResolver.getTableAvroSchema) match {
+        case Success(schema) => schema
+        case Failure(e) =>
+          throw new HoodieSchemaException("Failed to fetch schema from the 
table")
+      }
+    }
+
+    (avroSchema, internalSchemaOpt)
+  }
+
+  protected lazy val tableStructSchema: StructType = {
+    val converted = 
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+    val metaFieldMetadata = sparkAdapter.createCatalystMetadataForMetaField
+
+    // NOTE: Here we annotate meta-fields with corresponding metadata such 
that Spark (>= 3.2)
+    //       is able to recognize such fields as meta-fields
+    StructType(converted.map { field =>
+      if (metaFieldNames.exists(metaFieldName => resolver(metaFieldName, 
field.name))) {
+        field.copy(metadata = metaFieldMetadata)
+      } else {
+        field
+      }
+    })
+  }
+
+  protected lazy val preCombineFieldOpt: Option[String] =
+    Option(tableConfig.getPreCombineField)
+      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) 
match {
+      // NOTE: This is required to compensate for cases when empty string is 
used to stub
+      //       property value to avoid it being set with the default value
+      // TODO(HUDI-3456) cleanup
+      case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
+      case _ => None
+    }
+
+  protected lazy val recordKeyField: String =
+    if (tableConfig.populateMetaFields()) {
+      HoodieRecord.RECORD_KEY_METADATA_FIELD
+    } else {
+      val keyFields = tableConfig.getRecordKeyFields.get()
+      checkState(keyFields.length == 1)
+      keyFields.head
+    }
+
+  protected lazy val specifiedQueryTimestamp: Option[String] =
+    optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
+      .map(HoodieSqlCommonUtils.formatQueryInstant)
+
+  protected val mergeType: String = 
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
+    DataSourceReadOptions.REALTIME_MERGE.defaultValue)
+  protected val recordMergerImpls = 
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList
+  protected val recordMergerStrategy = 
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
+    Option(metaClient.getTableConfig.getRecordMergerStrategy))
+
+  protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
+    // Controls whether partition columns (which are the source for the 
partition path values) should
+    // be omitted from persistence in the data files. On the read path it 
affects whether partition values (values
+    // of partition columns) will be read from the data file or extracted from 
partition path
+    val shouldOmitPartitionColumns = 
metaClient.getTableConfig.shouldDropPartitionColumns && 
partitionColumns.nonEmpty
+    val shouldExtractPartitionValueFromPath =
+      
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
+        
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
+    val shouldUseBootstrapFastRead = 
optParams.getOrElse(DATA_QUERIES_ONLY.key(), "false").toBoolean
+
+    shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath || 
shouldUseBootstrapFastRead
+  }
+
+  protected lazy val mandatoryFieldsForMerging: Seq[String] =
+    Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
+
+  protected lazy val fileGroupReaderEnabled: Boolean = 
checkIfAConfigurationEnabled(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
+
+  protected lazy val shouldUseRecordPosition: Boolean = 
checkIfAConfigurationEnabled(HoodieWriteConfig.WRITE_RECORD_POSITIONS)
+
+  protected def queryTimestamp: Option[String] =
+    
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))
+
+  protected def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined
+
+  protected def timeline: HoodieTimeline =
+  // NOTE: We're including compaction here since it's not considering a 
"commit" operation
+    metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants
+
+  protected def getConfigValue(config: ConfigProperty[String],
+                             defaultValueOption: Option[String] = 
Option.empty): String = {
+    optParams.getOrElse(config.key(),
+      sqlContext.getConf(config.key(), 
defaultValueOption.getOrElse(config.defaultValue())))
+  }
+
+  protected def checkIfAConfigurationEnabled(config: 
ConfigProperty[java.lang.Boolean],
+                                           defaultValueOption: Option[String] 
= Option.empty): Boolean = {
+    optParams.getOrElse(config.key(),
+      sqlContext.getConf(config.key(), 
defaultValueOption.getOrElse(String.valueOf(config.defaultValue())))).toBoolean
+  }
+}
+
+class HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
+                                                       override val 
metaClient: HoodieTableMetaClient,
+                                                       override val options: 
Map[String, String],
+                                                       override val 
schemaSpec: Option[StructType],
+                                                       isBootstrap: Boolean)
+  extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options, 
schemaSpec) {
+
+  val fileIndex: HoodieFileIndex = new HoodieFileIndex(
+    sparkSession,
+    metaClient,
+    Some(tableStructSchema),
+    optParams,
+    FileStatusCache.getOrCreate(sparkSession),
+    includeLogFiles = true,
+    shouldEmbedFileSlices = true)
+
+  val configProperties: TypedProperties = getConfigProperties(sparkSession, 
options)
+  val metadataConfig: HoodieMetadataConfig = HoodieMetadataConfig.newBuilder
+    .fromProperties(configProperties)
+    .enable(configProperties.getBoolean(ENABLE.key, 
DEFAULT_METADATA_ENABLE_FOR_READERS)
+      && HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient)).build
+
+  val tableState: HoodieTableState = // Subset of the state of table's 
configuration as of at the time of the query
+    HoodieTableState(
+      tablePath = basePath.toString,
+      latestCommitTimestamp = queryTimestamp,
+      recordKeyField = recordKeyField,
+      preCombineFieldOpt = preCombineFieldOpt,
+      usesVirtualKeys = !tableConfig.populateMetaFields(),
+      recordPayloadClassName = tableConfig.getPayloadClass,
+      metadataConfig = metadataConfig,
+      recordMergerImpls = recordMergerImpls,
+      recordMergerStrategy = recordMergerStrategy
+    )
+
+  val mandatoryFields: Seq[String] = mandatoryFieldsForMerging
+  val fileGroupReaderBasedFileFormat = new 
HoodieFileGroupReaderBasedParquetFileFormat(
+    tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, 
internalSchemaOpt),
+    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+    true, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
+
+  val newHoodieParquetFileFormat = new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, true, 
isBootstrap, false, Seq.empty)
+
+  val multipleBaseFileFormat = new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
+    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, true, 
false, Seq.empty)
+
+  override def buildFileIndex(): FileIndex = fileIndex
+
+  override def buildFileFormat(): FileFormat = {
+    if (fileGroupReaderEnabled) {
+      fileGroupReaderBasedFileFormat
+    } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
+      multipleBaseFileFormat
+    } else {
+      newHoodieParquetFileFormat
+    }
+  }
+
+  override def buildDataSchema(): StructType = fileIndex.dataSchema
+
+  override def buildPartitionSchema(): StructType = fileIndex.partitionSchema
+
+  override def buildBucketSpec(): Option[BucketSpec] = None
+
+  override def buildOptions(): Map[String, String] = optParams
+  override def build(): HadoopFsRelation = {
+    HadoopFsRelation(
+      location = buildFileIndex(),
+      partitionSchema = buildPartitionSchema(),
+      dataSchema = buildDataSchema(),
+      bucketSpec = buildBucketSpec(),
+      fileFormat = buildFileFormat(),
+      options = buildOptions())(sparkSession)
+  }
+}
+
+class HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
+                                                          override val 
metaClient: HoodieTableMetaClient,
+                                                          override val 
options: Map[String, String],
+                                                          override val 
schemaSpec: Option[StructType],
+                                                          isBootstrap: Boolean)
+  extends HoodieMergeOnReadSnapshotHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+
+  override val fileIndex = new HoodieIncrementalFileIndex(
+    sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), true)
+
+  override val fileGroupReaderBasedFileFormat = new 
HoodieFileGroupReaderBasedParquetFileFormat(
+    tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, 
internalSchemaOpt),
+    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+    true, isBootstrap, true, shouldUseRecordPosition, 
fileIndex.getRequiredFilters)
+
+  override val newHoodieParquetFileFormat = new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+    true, isBootstrap, true, fileIndex.getRequiredFilters)
+
+  override val multipleBaseFileFormat = new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
+    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+    true, true, fileIndex.getRequiredFilters)
+}
+
+class HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
+                                                        override val 
metaClient: HoodieTableMetaClient,
+                                                        override val options: 
Map[String, String],
+                                                        override val 
schemaSpec: Option[StructType],
+                                                        isBootstrap: Boolean)
+  extends HoodieMergeOnReadSnapshotHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+  override val fileIndex: HoodieFileIndex = HoodieFileIndex(
+    sparkSession,
+    metaClient,
+    Some(tableStructSchema),
+    optParams,
+    FileStatusCache.getOrCreate(sparkSession),
+    shouldEmbedFileSlices = true)
+
+  override val fileGroupReaderBasedFileFormat = new 
HoodieFileGroupReaderBasedParquetFileFormat(
+    tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, 
internalSchemaOpt),
+    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+    false, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
+
+  override val newHoodieParquetFileFormat = new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+  metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, false, 
isBootstrap, false, Seq.empty)
+
+  override val multipleBaseFileFormat = new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
+    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, false, 
false, Seq.empty)
+}
+
+class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
+                                                          override val 
metaClient: HoodieTableMetaClient,
+                                                          override val 
options: Map[String, String],
+                                                          override val 
schemaSpec: Option[StructType],
+                                                          isBootstrap: Boolean)
+  extends HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+
+  override val fileIndex = new HoodieIncrementalFileIndex(
+    sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), false)
+
+  override val fileGroupReaderBasedFileFormat = new 
HoodieFileGroupReaderBasedParquetFileFormat(
+    tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, 
internalSchemaOpt),
+    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+    false, isBootstrap, true, shouldUseRecordPosition, 
fileIndex.getRequiredFilters)
+
+  override val newHoodieParquetFileFormat = new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+    false, isBootstrap, true, fileIndex.getRequiredFilters)
+
+  override val multipleBaseFileFormat = new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
+    sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+    metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+    false, true, fileIndex.getRequiredFilters)
+}
+
+class HoodieMergeOnReadCDCHadoopFsRelationFactory(override val sqlContext: 
SQLContext,
+                                                   override val metaClient: 
HoodieTableMetaClient,
+                                                   override val options: 
Map[String, String],
+                                                   override val schemaSpec: 
Option[StructType],
+                                                   isBootstrap: Boolean)
+  extends HoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+  override val fileIndex = new HoodieCDCFileIndex(
+    sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), true)
+}
+
+class HoodieCopyOnWriteCDCHadoopFsRelationFactory(override val sqlContext: 
SQLContext,
+                                                  override val metaClient: 
HoodieTableMetaClient,
+                                                  override val options: 
Map[String, String],
+                                                  override val schemaSpec: 
Option[StructType],
+                                                  isBootstrap: Boolean)
+  extends HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+  override val fileIndex = new HoodieCDCFileIndex(
+    sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), false)
+}
+
+
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
new file mode 100644
index 00000000000..b977c51ab67
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.util.JFunction
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, 
NoopCache, PartitionDirectory}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters.asScalaBufferConverter
+
+class HoodieIncrementalFileIndex(override val spark: SparkSession,
+                                 override val metaClient: 
HoodieTableMetaClient,
+                                 override val schemaSpec: Option[StructType],
+                                 override val options: Map[String, String],
+                                 @transient override val fileStatusCache: 
FileStatusCache = NoopCache,
+                                 override val includeLogFiles: Boolean)
+  extends HoodieFileIndex(
+    spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles
+  ) with FileIndex {
+  val mergeOnReadIncrementalRelation: MergeOnReadIncrementalRelation = 
MergeOnReadIncrementalRelation(
+    spark.sqlContext, options, metaClient, schemaSpec, schemaSpec)
+
+  override def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[PartitionDirectory] = {
+    val fileSlices = 
mergeOnReadIncrementalRelation.listFileSplits(partitionFilters, dataFilters)
+    if (fileSlices.isEmpty) {
+      Seq.empty
+    }
+
+    val prunedPartitionsAndFilteredFileSlices = fileSlices.map {
+      case (partitionValues, fileSlices) =>
+        if (shouldEmbedFileSlices) {
+          val baseFileStatusesAndLogFileOnly: Seq[FileStatus] = 
fileSlices.map(slice => {
+            if (slice.getBaseFile.isPresent) {
+              slice.getBaseFile.get().getFileStatus
+            } else if (slice.getLogFiles.findAny().isPresent) {
+              slice.getLogFiles.findAny().get().getFileStatus
+            } else {
+              null
+            }
+          }).filter(slice => slice != null)
+          val c = fileSlices.filter(f => f.getLogFiles.findAny().isPresent
+              || (f.getBaseFile.isPresent && 
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
+            foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> 
f) }
+          if (c.nonEmpty) {
+            PartitionDirectory(new 
HoodiePartitionFileSliceMapping(partitionValues, c), 
baseFileStatusesAndLogFileOnly)
+          } else {
+            PartitionDirectory(partitionValues, baseFileStatusesAndLogFileOnly)
+          }
+        } else {
+          val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
+            val baseFileStatusOpt = 
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+            val logFilesStatus = if (includeLogFiles) {
+              
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile, 
FileStatus](lf => lf.getFileStatus))
+            } else {
+              java.util.stream.Stream.empty()
+            }
+            val files = 
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
+            baseFileStatusOpt.foreach(f => files.append(f))
+            files
+          })
+          PartitionDirectory(partitionValues, allCandidateFiles)
+        }
+    }.toSeq
+
+    hasPushedDownPartitionPredicates = true
+    if (shouldReadAsPartitionedTable()) {
+      prunedPartitionsAndFilteredFileSlices
+    } else if (shouldEmbedFileSlices) {
+      assert(partitionSchema.isEmpty)
+      prunedPartitionsAndFilteredFileSlices
+    } else {
+      Seq(PartitionDirectory(InternalRow.empty, 
prunedPartitionsAndFilteredFileSlices.flatMap(_.files)))
+    }
+  }
+
+  override def inputFiles: Array[String] = {
+    val fileSlices = mergeOnReadIncrementalRelation.listFileSplits(Seq.empty, 
Seq.empty)
+    if (fileSlices.isEmpty) {
+      Array.empty
+    }
+    fileSlices.values.flatten.flatMap(fs => {
+      val baseFileStatusOpt = 
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+      val logFilesStatus = if (includeLogFiles) {
+        fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile, 
FileStatus](lf => lf.getFileStatus))
+      } else {
+        java.util.stream.Stream.empty()
+      }
+      val files = logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
+      baseFileStatusOpt.foreach(f => files.append(f))
+      files
+    }).map(fileStatus => fileStatus.getPath.toString).toArray
+  }
+
+  def getRequiredFilters: Seq[Filter] = {
+    mergeOnReadIncrementalRelation.getRequiredFilters
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionCDCFileGroupMapping.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionCDCFileGroupMapping.scala
new file mode 100644
index 00000000000..bd052c086ff
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionCDCFileGroupMapping.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.HoodieFileGroupId
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+import java.util
+
+class HoodiePartitionCDCFileGroupMapping(partitionValues: InternalRow,
+                                         fileGroups: Map[HoodieFileGroupId, 
List[HoodieCDCFileSplit]]
+                                        )
+  extends HoodiePartitionValues(partitionValues) {
+
+  def getFileSplitsFor(fileGroupId: HoodieFileGroupId): 
Option[List[HoodieCDCFileSplit]] = {
+    fileGroups.get(fileGroupId)
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionFileSliceMapping.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionFileSliceMapping.scala
new file mode 100644
index 00000000000..4121a41b8d8
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionFileSliceMapping.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.FileSlice
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+class HoodiePartitionFileSliceMapping(values: InternalRow,
+                                      slices: Map[String, FileSlice])
+  extends HoodiePartitionValues(values) {
+
+  def getSlice(fileId: String): Option[FileSlice] = {
+    slices.get(fileId)
+  }
+
+  def getPartitionValues: InternalRow = values
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionValues.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionValues.scala
new file mode 100644
index 00000000000..3587cc6bd26
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodiePartitionValues.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+case class HoodiePartitionValues(values: InternalRow) extends InternalRow {
+  override def numFields: Int = {
+    values.numFields
+  }
+
+  override def setNullAt(i: Int): Unit = {
+    values.setNullAt(i)
+  }
+
+  override def update(i: Int, value: Any): Unit = {
+    values.update(i, value)
+  }
+
+  override def copy(): InternalRow = {
+    HoodiePartitionValues(values.copy())
+  }
+
+  override def isNullAt(ordinal: Int): Boolean = {
+    values.isNullAt(ordinal)
+  }
+
+  override def getBoolean(ordinal: Int): Boolean = {
+    values.getBoolean(ordinal)
+  }
+
+  override def getByte(ordinal: Int): Byte = {
+    values.getByte(ordinal)
+  }
+
+  override def getShort(ordinal: Int): Short = {
+    values.getShort(ordinal)
+  }
+
+  override def getInt(ordinal: Int): Int = {
+    values.getInt(ordinal)
+  }
+
+  override def getLong(ordinal: Int): Long = {
+    values.getLong(ordinal)
+  }
+
+  override def getFloat(ordinal: Int): Float = {
+    values.getFloat(ordinal)
+  }
+
+  override def getDouble(ordinal: Int): Double = {
+    values.getDouble(ordinal)
+  }
+
+  override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = 
{
+    values.getDecimal(ordinal, precision, scale)
+  }
+
+  override def getUTF8String(ordinal: Int): UTF8String = {
+    values.getUTF8String(ordinal)
+  }
+
+  override def getBinary(ordinal: Int): Array[Byte] = {
+    values.getBinary(ordinal)
+  }
+
+  override def getInterval(ordinal: Int): CalendarInterval = {
+    values.getInterval(ordinal)
+  }
+
+  override def getStruct(ordinal: Int, numFields: Int): InternalRow = {
+    values.getStruct(ordinal, numFields)
+  }
+
+  override def getArray(ordinal: Int): ArrayData = {
+    values.getArray(ordinal)
+  }
+
+  override def getMap(ordinal: Int): MapData = {
+    values.getMap(ordinal)
+  }
+
+  override def get(ordinal: Int, dataType: DataType): AnyRef = {
+    values.get(ordinal, dataType)
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
index e66b248e0ab..fe1645d3b60 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
@@ -178,8 +178,8 @@ class HoodieSparkFileFormatUtils(val sqlContext: SQLContext,
   def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined
 
   def getHadoopFsRelation(isMOR: Boolean, isBootstrap: Boolean): BaseRelation 
= {
-
-    val fileIndex = HoodieFileIndex(sparkSession, metaClient, 
Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sparkSession), 
isMOR)
+    val fileIndex = HoodieFileIndex(
+      sparkSession, metaClient, Some(tableStructSchema), optParams, 
FileStatusCache.getOrCreate(sparkSession), isMOR, shouldEmbedFileSlices = true)
     val recordMergerImpls = 
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList
     val recordMergerStrategy = 
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
       Option(metaClient.getTableConfig.getRecordMergerStrategy))
@@ -205,20 +205,21 @@ class HoodieSparkFileFormatUtils(val sqlContext: 
SQLContext,
     } else {
       Seq.empty
     }
-    fileIndex.shouldEmbedFileSlices = true
 
     val fileFormat = if (fileGroupReaderEnabled) {
       new HoodieFileGroupReaderBasedParquetFileFormat(
         tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
isMOR, isBootstrap, shouldUseRecordPosition)
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+        isMOR, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
     } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
       new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
         
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
isMOR)
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
isMOR, false, Seq.empty)
     } else {
       new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
         
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
isMOR, isBootstrap)
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+        isMOR, isBootstrap, false, Seq.empty)
     }
 
     HadoopFsRelation(
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 4dda08c2e28..44d937f22ad 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -23,7 +23,7 @@ import 
org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling
 import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME
-import org.apache.hudi.common.table.timeline.TimelineUtils.{concatTimeline, 
HollowCommitHandling, getCommitMetadata, handleHollowCommitIfNeeded}
+import 
org.apache.hudi.common.table.timeline.TimelineUtils.{HollowCommitHandling, 
concatTimeline, getCommitMetadata, handleHollowCommitIfNeeded}
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.util.StringUtils
@@ -33,6 +33,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.PartitionDirectory
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
 
@@ -119,6 +120,37 @@ case class MergeOnReadIncrementalRelation(override val 
sqlContext: SQLContext,
     }
   }
 
+  def listFileSplits(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Map[InternalRow, Seq[FileSlice]] = {
+    val slices = if (includedCommits.isEmpty) {
+      List()
+    } else {
+      val fileSlices = if (fullTableScan) {
+        listLatestFileSlices(Seq(), partitionFilters, dataFilters)
+      } else {
+        val latestCommit = includedCommits.last.getTimestamp
+        val fsView = new HoodieTableFileSystemView(metaClient, timeline, 
affectedFilesInCommits)
+        val modifiedPartitions = getWritePartitionPaths(commitsMetadata)
+
+        modifiedPartitions.asScala.flatMap { relativePartitionPath =>
+          fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, 
latestCommit).iterator().asScala
+        }.toSeq
+      }
+      filterFileSlices(fileSlices, globPattern)
+    }
+
+    slices.groupBy(fs => {
+      if (fs.getBaseFile.isPresent) {
+        getPartitionColumnValuesAsInternalRow(fs.getBaseFile.get.getFileStatus)
+      } else {
+        
getPartitionColumnValuesAsInternalRow(fs.getLogFiles.findAny.get.getFileStatus)
+      }
+    })
+  }
+
+  def getRequiredFilters: Seq[Filter] = {
+    incrementalSpanRecordFilters
+  }
+
   private def filterFileSlices(fileSlices: Seq[FileSlice], pathGlobPattern: 
String): Seq[FileSlice] = {
     val filteredFileSlices = if (!StringUtils.isNullOrEmpty(pathGlobPattern)) {
       val globMatcher = new GlobPattern("*" + pathGlobPattern)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
deleted file mode 100644
index 1e639f0daab..00000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.common.model.FileSlice
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
-import org.apache.spark.sql.types.{DataType, Decimal}
-import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
-
-class PartitionFileSliceMapping(internalRow: InternalRow,
-                                slices: Map[String, FileSlice]) extends 
InternalRow {
-
-  def getSlice(fileId: String): Option[FileSlice] = {
-    slices.get(fileId)
-  }
-
-  def getInternalRow: InternalRow = internalRow
-
-  override def numFields: Int = internalRow.numFields
-
-  override def setNullAt(i: Int): Unit = internalRow.setNullAt(i)
-
-  override def update(i: Int, value: Any): Unit = internalRow.update(i, value)
-
-  override def copy(): InternalRow = new 
PartitionFileSliceMapping(internalRow.copy(), slices)
-
-  override def isNullAt(ordinal: Int): Boolean = internalRow.isNullAt(ordinal)
-
-  override def getBoolean(ordinal: Int): Boolean = 
internalRow.getBoolean(ordinal)
-
-  override def getByte(ordinal: Int): Byte = internalRow.getByte(ordinal)
-
-  override def getShort(ordinal: Int): Short = internalRow.getShort(ordinal)
-
-  override def getInt(ordinal: Int): Int = internalRow.getInt(ordinal)
-
-  override def getLong(ordinal: Int): Long = internalRow.getLong(ordinal)
-
-  override def getFloat(ordinal: Int): Float = internalRow.getFloat(ordinal)
-
-  override def getDouble(ordinal: Int): Double = internalRow.getDouble(ordinal)
-
-  override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = 
internalRow.getDecimal(ordinal, precision, scale)
-
-  override def getUTF8String(ordinal: Int): UTF8String = 
internalRow.getUTF8String(ordinal)
-
-  override def getBinary(ordinal: Int): Array[Byte] = 
internalRow.getBinary(ordinal)
-
-  override def getInterval(ordinal: Int): CalendarInterval = 
internalRow.getInterval(ordinal)
-
-  override def getStruct(ordinal: Int, numFields: Int): InternalRow = 
internalRow.getStruct(ordinal, numFields)
-
-  override def getArray(ordinal: Int): ArrayData = 
internalRow.getArray(ordinal)
-
-  override def getMap(ordinal: Int): MapData = internalRow.getMap(ordinal)
-
-  override def get(ordinal: Int, dataType: DataType): AnyRef = 
internalRow.get(ordinal, dataType)
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
new file mode 100644
index 00000000000..a08014a4008
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.cdc
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieBaseRelation.BaseFileReader
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.{AvroConversionUtils, AvroProjection, HoodieFileIndex, 
HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, 
LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport}
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, 
HoodieLogFile, HoodieRecord, HoodieRecordMerger, HoodieRecordPayload}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils}
+import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase._
+import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator
+import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.HoodiePayloadConfig
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.avro.HoodieAvroDeserializer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Projection
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.io.Closeable
+import java.util.Properties
+import java.util.stream.Collectors
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.asScalaBufferConverter
+
+class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
+                           metaClient: HoodieTableMetaClient,
+                           conf: Configuration,
+                           parquetReader: PartitionedFile => 
Iterator[InternalRow],
+                           originTableSchema: HoodieTableSchema,
+                           cdcSchema: StructType,
+                           requiredCdcSchema: StructType,
+                           props: TypedProperties)
+  extends Iterator[InternalRow]
+  with SparkAdapterSupport with AvroDeserializerSupport with Closeable {
+
+  protected val payloadProps: Properties = 
Option(metaClient.getTableConfig.getPreCombineField)
+    .map { preCombineField =>
+      HoodiePayloadConfig.newBuilder
+        .withPayloadOrderingField(preCombineField)
+        .build
+        .getProps
+    }.getOrElse(new Properties())
+
+  private lazy val fs = metaClient.getFs.getFileSystem
+
+  private lazy val basePath = metaClient.getBasePathV2
+
+  private lazy val tableConfig = metaClient.getTableConfig
+
+  private lazy val populateMetaFields = tableConfig.populateMetaFields()
+
+  private lazy val keyGenerator = {
+    HoodieSparkKeyGeneratorFactory.createKeyGenerator(tableConfig.getProps())
+  }
+
+  private lazy val recordKeyField: String = if (populateMetaFields) {
+    HoodieRecord.RECORD_KEY_METADATA_FIELD
+  } else {
+    val keyFields = metaClient.getTableConfig.getRecordKeyFields.get()
+    checkState(keyFields.length == 1)
+    keyFields.head
+  }
+
+  private lazy val preCombineFieldOpt: Option[String] = 
Option(metaClient.getTableConfig.getPreCombineField)
+
+  private lazy val tableState = {
+    val metadataConfig = HoodieMetadataConfig.newBuilder()
+      .fromProperties(props)
+      .build()
+    HoodieTableState(
+      pathToString(basePath),
+      Some(split.changes.last.getInstant),
+      recordKeyField,
+      preCombineFieldOpt,
+      usesVirtualKeys = !populateMetaFields,
+      metaClient.getTableConfig.getPayloadClass,
+      metadataConfig,
+      // TODO support CDC with spark record
+      recordMergerImpls = List(classOf[HoodieAvroRecordMerger].getName),
+      recordMergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
+    )
+  }
+
+  protected override val avroSchema: Schema = new 
Schema.Parser().parse(originTableSchema.avroSchemaStr)
+
+  protected override val structTypeSchema: StructType = 
originTableSchema.structTypeSchema
+
+  private val cdcSupplementalLoggingMode = 
metaClient.getTableConfig.cdcSupplementalLoggingMode
+
+  private lazy val serializer = 
sparkAdapter.createAvroSerializer(originTableSchema.structTypeSchema,
+    avroSchema, nullable = false)
+
+  private lazy val avroProjection = AvroProjection.create(avroSchema)
+
+  private lazy val cdcAvroSchema: Schema = 
HoodieCDCUtils.schemaBySupplementalLoggingMode(
+    cdcSupplementalLoggingMode,
+    HoodieAvroUtils.removeMetadataFields(avroSchema)
+  )
+
+  private lazy val cdcSparkSchema: StructType = 
AvroConversionUtils.convertAvroSchemaToStructType(cdcAvroSchema)
+
+  private lazy val sparkPartitionedFileUtils = 
sparkAdapter.getSparkPartitionedFileUtils
+
+  /**
+   * The deserializer used to convert the CDC GenericRecord to Spark 
InternalRow.
+   */
+  private lazy val cdcRecordDeserializer: HoodieAvroDeserializer = {
+    sparkAdapter.createAvroDeserializer(cdcAvroSchema, cdcSparkSchema)
+  }
+
+  private lazy val projection: Projection = 
generateUnsafeProjection(cdcSchema, requiredCdcSchema)
+
+  // Iterator on cdc file
+  private val cdcFileIter = split.changes.iterator
+
+  // The instant that is currently being processed
+  private var currentInstant: String = _
+
+  // The change file that is currently being processed
+  private var currentCDCFileSplit: HoodieCDCFileSplit = _
+
+  /**
+   * Two cases will use this to iterator the records:
+   * 1) extract the change data from the base file directly, including 
'BASE_FILE_INSERT' and 'BASE_FILE_DELETE'.
+   * 2) when the type of cdc file is 'REPLACE_COMMIT',
+   * use this to trace the records that are converted from the 
'[[beforeImageRecords]]
+   */
+  private var recordIter: Iterator[InternalRow] = Iterator.empty
+
+  /**
+   * Only one case where it will be used is that extract the change data from 
log files for mor table.
+   * At the time, 'logRecordIter' will work with [[beforeImageRecords]] that 
keep all the records of the previous file slice.
+   */
+  private var logRecordIter: Iterator[(String, HoodieRecord[_])] = 
Iterator.empty
+
+  /**
+   * Only one case where it will be used is that extract the change data from 
cdc log files.
+   */
+  private var cdcLogRecordIterator: HoodieCDCLogRecordIterator = _
+
+  /**
+   * The next record need to be returned when call next().
+   */
+  protected var recordToLoad: InternalRow = _
+
+  /**
+   * The list of files to which 'beforeImageRecords' belong.
+   * Use it to determine if 'beforeImageRecords' contains all the required 
data that extract
+   * the change data from the current cdc file.
+   */
+  private val beforeImageFiles: mutable.ArrayBuffer[String] = 
mutable.ArrayBuffer.empty
+
+  /**
+   * Keep the before-image data. There cases will use this:
+   * 1) the cdc infer case is [[LOG_FILE]];
+   * 2) the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is 
'op_key'.
+   */
+  private var beforeImageRecords: mutable.Map[String, GenericRecord] = 
mutable.Map.empty
+
+  /**
+   * Keep the after-image data. Only one case will use this:
+   * the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is 
[[OP_KEY_ONLY]] or [[DATA_BEFORE]].
+   */
+  private var afterImageRecords: mutable.Map[String, InternalRow] = 
mutable.Map.empty
+
+  private var internalRowToJsonStringConverter = new 
InternalRowToJsonStringConverter(originTableSchema)
+
+  private def needLoadNextFile: Boolean = {
+    !recordIter.hasNext &&
+      !logRecordIter.hasNext &&
+      (cdcLogRecordIterator == null || !cdcLogRecordIterator.hasNext)
+  }
+
+  @tailrec final def hasNextInternal: Boolean = {
+    if (needLoadNextFile) {
+      loadCdcFile()
+    }
+    if (currentCDCFileSplit == null) {
+      false
+    } else {
+      currentCDCFileSplit.getCdcInferCase match {
+        case BASE_FILE_INSERT | BASE_FILE_DELETE | REPLACE_COMMIT =>
+          if (recordIter.hasNext && loadNext()) {
+            true
+          } else {
+            hasNextInternal
+          }
+        case LOG_FILE =>
+          if (logRecordIter.hasNext && loadNext()) {
+            true
+          } else {
+            hasNextInternal
+          }
+        case AS_IS =>
+          if (cdcLogRecordIterator.hasNext && loadNext()) {
+            true
+          } else {
+            hasNextInternal
+          }
+      }
+    }
+  }
+
+  override def hasNext: Boolean = hasNextInternal
+
+  override final def next(): InternalRow = {
+    projection(recordToLoad)
+  }
+
+  def loadNext(): Boolean = {
+    var loaded = false
+    currentCDCFileSplit.getCdcInferCase match {
+      case BASE_FILE_INSERT =>
+        val originRecord = recordIter.next()
+        recordToLoad.update(3, convertRowToJsonString(originRecord))
+        loaded = true
+      case BASE_FILE_DELETE =>
+        val originRecord = recordIter.next()
+        recordToLoad.update(2, convertRowToJsonString(originRecord))
+        loaded = true
+      case LOG_FILE =>
+        loaded = loadNextLogRecord()
+      case AS_IS =>
+        val record = cdcLogRecordIterator.next().asInstanceOf[GenericRecord]
+        cdcSupplementalLoggingMode match {
+          case `DATA_BEFORE_AFTER` =>
+            recordToLoad.update(0, 
convertToUTF8String(String.valueOf(record.get(0))))
+            val before = record.get(2).asInstanceOf[GenericRecord]
+            recordToLoad.update(2, recordToJsonAsUTF8String(before))
+            val after = record.get(3).asInstanceOf[GenericRecord]
+            recordToLoad.update(3, recordToJsonAsUTF8String(after))
+          case `DATA_BEFORE` =>
+            val row = 
cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
+            val op = row.getString(0)
+            val recordKey = row.getString(1)
+            recordToLoad.update(0, convertToUTF8String(op))
+            val before = record.get(2).asInstanceOf[GenericRecord]
+            recordToLoad.update(2, recordToJsonAsUTF8String(before))
+            parse(op) match {
+              case INSERT =>
+                recordToLoad.update(3, 
convertRowToJsonString(afterImageRecords(recordKey)))
+              case UPDATE =>
+                recordToLoad.update(3, 
convertRowToJsonString(afterImageRecords(recordKey)))
+              case _ =>
+                recordToLoad.update(3, null)
+            }
+          case _ =>
+            val row = 
cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
+            val op = row.getString(0)
+            val recordKey = row.getString(1)
+            recordToLoad.update(0, convertToUTF8String(op))
+            parse(op) match {
+              case INSERT =>
+                recordToLoad.update(2, null)
+                recordToLoad.update(3, 
convertRowToJsonString(afterImageRecords(recordKey)))
+              case UPDATE =>
+                recordToLoad.update(2, 
recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
+                recordToLoad.update(3, 
convertRowToJsonString(afterImageRecords(recordKey)))
+              case _ =>
+                recordToLoad.update(2, 
recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
+                recordToLoad.update(3, null)
+            }
+        }
+        loaded = true
+      case REPLACE_COMMIT =>
+        val originRecord = recordIter.next()
+        recordToLoad.update(2, convertRowToJsonString(originRecord))
+        loaded = true
+    }
+    loaded
+  }
+
+  /**
+   * Load the next log record, and judge how to convert it to cdc format.
+   */
+  private def loadNextLogRecord(): Boolean = {
+    var loaded = false
+    val (key, logRecord) = logRecordIter.next()
+    val indexedRecord = getInsertValue(logRecord)
+    if (indexedRecord.isEmpty) {
+      // it's a deleted record.
+      val existingRecordOpt = beforeImageRecords.remove(key)
+      if (existingRecordOpt.isEmpty) {
+        // no real record is deleted, just ignore.
+      } else {
+        // there is a real record deleted.
+        recordToLoad.update(0, CDCRelation.CDC_OPERATION_DELETE)
+        recordToLoad.update(2, recordToJsonAsUTF8String(existingRecordOpt.get))
+        recordToLoad.update(3, null)
+        loaded = true
+      }
+    } else {
+      val existingRecordOpt = beforeImageRecords.get(key)
+      if (existingRecordOpt.isEmpty) {
+        // a new record is inserted.
+        val insertedRecord = 
avroProjection(indexedRecord.get.asInstanceOf[GenericRecord])
+        recordToLoad.update(0, CDCRelation.CDC_OPERATION_INSERT)
+        recordToLoad.update(2, null)
+        recordToLoad.update(3, recordToJsonAsUTF8String(insertedRecord))
+        // insert into beforeImageRecords
+        beforeImageRecords(key) = insertedRecord
+        loaded = true
+      } else {
+        // a existed record is updated.
+        val existingRecord = existingRecordOpt.get
+        val merged = merge(existingRecord, logRecord)
+        val mergeRecord = avroProjection(merged.asInstanceOf[GenericRecord])
+        if (existingRecord != mergeRecord) {
+          recordToLoad.update(0, CDCRelation.CDC_OPERATION_UPDATE)
+          recordToLoad.update(2, recordToJsonAsUTF8String(existingRecord))
+          recordToLoad.update(3, recordToJsonAsUTF8String(mergeRecord))
+          // update into beforeImageRecords
+          beforeImageRecords(key) = mergeRecord
+          loaded = true
+        }
+      }
+    }
+    loaded
+  }
+
+  private def loadCdcFile(): Unit = {
+    // reset all the iterator first.
+    recordIter = Iterator.empty
+    logRecordIter = Iterator.empty
+    beforeImageRecords.clear()
+    afterImageRecords.clear()
+    if (cdcLogRecordIterator != null) {
+      cdcLogRecordIterator.close()
+      cdcLogRecordIterator = null
+    }
+
+    if (cdcFileIter.hasNext) {
+      val split = cdcFileIter.next()
+      currentInstant = split.getInstant
+      currentCDCFileSplit = split
+      currentCDCFileSplit.getCdcInferCase match {
+        case BASE_FILE_INSERT =>
+          assert(currentCDCFileSplit.getCdcFiles != null && 
currentCDCFileSplit.getCdcFiles.size() == 1)
+          val absCDCPath = new Path(basePath, 
currentCDCFileSplit.getCdcFiles.get(0))
+          val fileStatus = fs.getFileStatus(absCDCPath)
+
+          val pf = sparkPartitionedFileUtils.createPartitionedFile(
+            InternalRow.empty, absCDCPath, 0, fileStatus.getLen)
+          recordIter = parquetReader(pf)
+        case BASE_FILE_DELETE =>
+          assert(currentCDCFileSplit.getBeforeFileSlice.isPresent)
+          recordIter = 
loadFileSlice(currentCDCFileSplit.getBeforeFileSlice.get)
+        case LOG_FILE =>
+          assert(currentCDCFileSplit.getCdcFiles != null && 
currentCDCFileSplit.getCdcFiles.size() == 1
+            && currentCDCFileSplit.getBeforeFileSlice.isPresent)
+          
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
+          val absLogPath = new Path(basePath, 
currentCDCFileSplit.getCdcFiles.get(0))
+          val morSplit = HoodieMergeOnReadFileSplit(None, List(new 
HoodieLogFile(fs.getFileStatus(absLogPath))))
+          val logFileIterator = new LogFileIterator(morSplit, 
originTableSchema, originTableSchema, tableState, conf)
+          logRecordIter = logFileIterator.logRecordsPairIterator
+        case AS_IS =>
+          assert(currentCDCFileSplit.getCdcFiles != null && 
!currentCDCFileSplit.getCdcFiles.isEmpty)
+          // load beforeFileSlice to beforeImageRecords
+          if (currentCDCFileSplit.getBeforeFileSlice.isPresent) {
+            
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
+          }
+          // load afterFileSlice to afterImageRecords
+          if (currentCDCFileSplit.getAfterFileSlice.isPresent) {
+            val iter = 
loadFileSlice(currentCDCFileSplit.getAfterFileSlice.get())
+            afterImageRecords = mutable.Map.empty
+            iter.foreach { row =>
+              val key = getRecordKey(row)
+              afterImageRecords.put(key, row.copy())
+            }
+          }
+
+          val cdcLogFiles = currentCDCFileSplit.getCdcFiles.asScala.map { 
cdcFile =>
+            new HoodieLogFile(fs.getFileStatus(new Path(basePath, cdcFile)))
+          }.toArray
+          cdcLogRecordIterator = new HoodieCDCLogRecordIterator(fs, 
cdcLogFiles, cdcAvroSchema)
+        case REPLACE_COMMIT =>
+          if (currentCDCFileSplit.getBeforeFileSlice.isPresent) {
+            
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
+          }
+          recordIter = beforeImageRecords.values.map { record =>
+            deserialize(record)
+          }.iterator
+          beforeImageRecords.clear()
+      }
+      resetRecordFormat()
+    } else {
+      currentInstant = null
+      currentCDCFileSplit = null
+    }
+  }
+
+  /**
+   * Initialize the partial fields of the data to be returned in advance to 
speed up.
+   */
+  private def resetRecordFormat(): Unit = {
+    recordToLoad = currentCDCFileSplit.getCdcInferCase match {
+      case BASE_FILE_INSERT =>
+        InternalRow.fromSeq(Array(
+          CDCRelation.CDC_OPERATION_INSERT, 
convertToUTF8String(currentInstant),
+          null, null))
+      case BASE_FILE_DELETE =>
+        InternalRow.fromSeq(Array(
+          CDCRelation.CDC_OPERATION_DELETE, 
convertToUTF8String(currentInstant),
+          null, null))
+      case LOG_FILE =>
+        InternalRow.fromSeq(Array(
+          null, convertToUTF8String(currentInstant),
+          null, null))
+      case AS_IS =>
+        InternalRow.fromSeq(Array(
+          null, convertToUTF8String(currentInstant),
+          null, null))
+      case REPLACE_COMMIT =>
+        InternalRow.fromSeq(Array(
+          CDCRelation.CDC_OPERATION_DELETE, 
convertToUTF8String(currentInstant),
+          null, null))
+    }
+  }
+
+  /**
+   * If [[beforeImageFiles]] are the list of file that we want to load 
exactly, use this directly.
+   * Otherwise we need to re-load what we need.
+   */
+  private def loadBeforeFileSliceIfNeeded(fileSlice: FileSlice): Unit = {
+    val files = List(fileSlice.getBaseFile.get().getPath) ++
+      fileSlice.getLogFiles.collect(Collectors.toList[HoodieLogFile]).asScala
+        .map(f => pathToString(f.getPath)).toList
+    val same = files.sorted == beforeImageFiles.sorted.toList
+    if (!same) {
+      // clear up the beforeImageRecords
+      beforeImageRecords.clear()
+      val iter = loadFileSlice(fileSlice)
+      iter.foreach { row =>
+        val key = getRecordKey(row)
+        // Due to the reuse buffer mechanism of Spark serialization,
+        // we have to copy the serialized result if we need to retain its 
reference
+        beforeImageRecords.put(key, serialize(row, copy = true))
+      }
+      // reset beforeImageFiles
+      beforeImageFiles.clear()
+      beforeImageFiles.append(files: _*)
+    }
+  }
+
+  private def loadFileSlice(fileSlice: FileSlice): Iterator[InternalRow] = {
+    val baseFileStatus = fs.getFileStatus(new 
Path(fileSlice.getBaseFile.get().getPath))
+    val basePartitionedFile = sparkPartitionedFileUtils.createPartitionedFile(
+      InternalRow.empty,
+      baseFileStatus.getPath,
+      0,
+      baseFileStatus.getLen
+    )
+    val logFiles = fileSlice.getLogFiles
+      .sorted(HoodieLogFile.getLogFileComparator)
+      .collect(Collectors.toList[HoodieLogFile])
+      .asScala.toList
+      .filterNot(_.getFileName.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+
+    if (logFiles.isEmpty) {
+      // no log files, just load the base parquet file
+      parquetReader(basePartitionedFile)
+    } else {
+      // use [[RecordMergingFileIterator]] to load both the base file and log 
files
+      val morSplit = HoodieMergeOnReadFileSplit(Some(basePartitionedFile), 
logFiles)
+      new RecordMergingFileIterator(
+        morSplit,
+        BaseFileReader(parquetReader, originTableSchema.structTypeSchema),
+        originTableSchema,
+        originTableSchema,
+        tableState,
+        conf)
+    }
+  }
+
+  /**
+   * Convert InternalRow to json string.
+   */
+  private def convertRowToJsonString(record: InternalRow): UTF8String = {
+    internalRowToJsonStringConverter.convert(record)
+  }
+
+  /**
+   * The data of string type is stored in InternalRow using UTF8String type.
+   */
+  private def convertToUTF8String(str: String): UTF8String = {
+    UTF8String.fromString(str)
+  }
+
+  private def pathToString(p: Path): String = {
+    p.toUri.toString
+  }
+
+  private def serialize(curRowRecord: InternalRow, copy: Boolean = false): 
GenericRecord = {
+    val record = serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
+    if (copy) {
+      GenericData.get().deepCopy(record.getSchema, record)
+    } else {
+      record
+    }
+  }
+
+  private def recordToJsonAsUTF8String(record: GenericRecord): UTF8String = {
+    convertToUTF8String(HoodieCDCUtils.recordToJson(record))
+  }
+
+  private def getRecordKey(row: InternalRow): String = {
+    if (populateMetaFields) {
+      
row.getString(structTypeSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD))
+    } else {
+      this.keyGenerator.getKey(serialize(row)).getRecordKey
+    }
+  }
+
+  private def getInsertValue(
+                              record: HoodieRecord[_])
+  : Option[IndexedRecord] = {
+    toScalaOption(record.toIndexedRecord(avroSchema, 
payloadProps)).map(_.getData)
+  }
+
+  private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_]): 
IndexedRecord = {
+    
newRecord.getData.asInstanceOf[HoodieRecordPayload[_]].combineAndGetUpdateValue(
+      curAvroRecord, avroSchema, payloadProps).get()
+  }
+
+  override def close(): Unit = {}
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
index c250a875f2b..0e657600e0d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
@@ -26,7 +26,7 @@ import 
org.apache.hudi.DataSourceReadOptions.{REALTIME_PAYLOAD_COMBINE_OPT_VAL,
 import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
-import org.apache.hudi.{HoodieBaseRelation, HoodieTableSchema, 
HoodieTableState, LogFileIterator, MergeOnReadSnapshotRelation, 
PartitionFileSliceMapping, RecordMergingFileIterator, SparkAdapterSupport}
+import org.apache.hudi.{HoodieBaseRelation, HoodieTableSchema, 
HoodieTableState, LogFileIterator, MergeOnReadSnapshotRelation, 
HoodiePartitionFileSliceMapping, RecordMergingFileIterator, SparkAdapterSupport}
 import org.apache.spark.broadcast.Broadcast
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.SparkSession
@@ -49,7 +49,9 @@ class HoodieMultipleBaseFileFormat(tableState: 
Broadcast[HoodieTableState],
                                    tableName: String,
                                    mergeType: String,
                                    mandatoryFields: Seq[String],
-                                   isMOR: Boolean) extends FileFormat with 
SparkAdapterSupport {
+                                   isMOR: Boolean,
+                                   isIncremental: Boolean,
+                                   requiredFilters: Seq[Filter]) extends 
FileFormat with SparkAdapterSupport {
   private val parquetFormat = new ParquetFileFormat()
   private val orcFormat = new OrcFileFormat()
 
@@ -104,7 +106,9 @@ class HoodieMultipleBaseFileFormat(tableState: 
Broadcast[HoodieTableState],
                                               options: Map[String, String],
                                               hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
     val outputSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
-    val requiredSchemaWithMandatory = if (!isMOR || 
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
+    val requiredSchemaWithMandatory = if (isIncremental) {
+      StructType(dataSchema.toArray ++ partitionSchema.fields)
+    } else if (!isMOR || 
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
       // add mandatory fields to required schema
       val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
       for (field <- mandatoryFields) {
@@ -120,14 +124,15 @@ class HoodieMultipleBaseFileFormat(tableState: 
Broadcast[HoodieTableState],
     }
 
     val (parquetBaseFileReader, orcBaseFileReader, 
preMergeParquetBaseFileReader, preMergeOrcBaseFileReader) = buildFileReaders(
-      sparkSession, dataSchema, partitionSchema, requiredSchema, filters, 
options, hadoopConf, requiredSchemaWithMandatory)
+      sparkSession, dataSchema, partitionSchema, if (isIncremental) 
requiredSchemaWithMandatory else requiredSchema,
+      filters, options, hadoopConf, requiredSchemaWithMandatory)
 
     val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
     (file: PartitionedFile) => {
       val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
       val fileFormat = detectFileFormat(filePath.toString)
       file.partitionValues match {
-        case fileSliceMapping: PartitionFileSliceMapping =>
+        case fileSliceMapping: HoodiePartitionFileSliceMapping =>
           if (FSUtils.isLogFile(filePath)) {
             // no base file
             val fileSlice = 
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
@@ -141,7 +146,7 @@ class HoodieMultipleBaseFileFormat(tableState: 
Broadcast[HoodieTableState],
               case Some(fileSlice) =>
                 val hoodieBaseFile = fileSlice.getBaseFile.get()
                 val baseFileFormat = 
detectFileFormat(hoodieBaseFile.getFileName)
-                val partitionValues = fileSliceMapping.getInternalRow
+                val partitionValues = fileSliceMapping.getPartitionValues
                 val logFiles = getLogFilesFromSlice(fileSlice)
                 if (requiredSchemaWithMandatory.isEmpty) {
                   val baseFile = createPartitionedFile(partitionValues, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
@@ -198,20 +203,20 @@ class HoodieMultipleBaseFileFormat(tableState: 
Broadcast[HoodieTableState],
     PartitionedFile => Iterator[InternalRow],
     PartitionedFile => Iterator[InternalRow]) = {
     val parquetBaseFileReader = 
parquetFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema, requiredSchema,
-      filters, options, new Configuration(hadoopConf))
+      filters ++ requiredFilters, options, new Configuration(hadoopConf))
     val orcBaseFileReader = 
orcFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema, requiredSchema,
-      filters, options, new Configuration(hadoopConf))
+      filters ++ requiredFilters, options, new Configuration(hadoopConf))
 
     val preMergeParquetBaseFileReader = if (isMOR) {
       parquetFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, 
StructType(Seq.empty),
-        requiredSchemaWithMandatory, Seq.empty, options, new 
Configuration(hadoopConf))
+        requiredSchemaWithMandatory, requiredFilters, options, new 
Configuration(hadoopConf))
     } else {
       _: PartitionedFile => Iterator.empty
     }
 
     val preMergeOrcBaseFileReader = if (isMOR) {
       orcFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, 
StructType(Seq.empty),
-        requiredSchemaWithMandatory, Seq.empty, options, new 
Configuration(hadoopConf))
+        requiredSchemaWithMandatory, requiredFilters, options, new 
Configuration(hadoopConf))
     } else {
       _: PartitionedFile => Iterator.empty
     }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 189bcb39e61..d8278ea8218 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -26,8 +26,13 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile, 
HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.read.HoodieFileGroupReader
+import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, 
HoodieCDCFileGroupSplit}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieFileGroupId
 import org.apache.hudi.common.util.{Option => HOption}
-import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils, 
HoodieTableSchema, HoodieTableState, MergeOnReadSnapshotRelation, 
PartitionFileSliceMapping, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex, 
HoodiePartitionCDCFileGroupMapping,
+  HoodieSparkUtils, HoodieTableSchema, HoodieTableState, 
MergeOnReadSnapshotRelation, HoodiePartitionFileSliceMapping,
+  SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -35,6 +40,7 @@ import 
org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
+import org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -51,7 +57,9 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: 
HoodieTableState,
                                                   mandatoryFields: Seq[String],
                                                   isMOR: Boolean,
                                                   isBootstrap: Boolean,
-                                                  shouldUseRecordPosition: 
Boolean
+                                                  isIncremental: Boolean,
+                                                  shouldUseRecordPosition: 
Boolean,
+                                                  requiredFilters: Seq[Filter]
                                            ) extends ParquetFileFormat with 
SparkAdapterSupport {
   var isProjected = false
 
@@ -70,33 +78,32 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     supportBatchResult
   }
 
-  override def isSplitable(
-                            sparkSession: SparkSession,
-                            options: Map[String, String],
-                            path: Path): Boolean = {
-    false
-  }
+  override def isSplitable(sparkSession: SparkSession,
+                           options: Map[String, String],
+                           path: Path): Boolean = false
 
-  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+  override def buildReaderWithPartitionValues(spark: SparkSession,
                                               dataSchema: StructType,
                                               partitionSchema: StructType,
                                               requiredSchema: StructType,
                                               filters: Seq[Filter],
                                               options: Map[String, String],
                                               hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
-    val requiredSchemaWithMandatory = 
generateRequiredSchemaWithMandatory(requiredSchema, dataSchema)
+    val requiredSchemaWithMandatory = 
generateRequiredSchemaWithMandatory(requiredSchema, dataSchema, partitionSchema)
     val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f 
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
     val requiredMeta = StructType(requiredSchemaSplits._1)
     val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
     val augmentedHadoopConf = FSUtils.buildInlineConf(hadoopConf)
     val (baseFileReader, preMergeBaseFileReader, _, _) = buildFileReaders(
-      sparkSession, dataSchema, partitionSchema, requiredSchema, filters, 
options, augmentedHadoopConf,
-      requiredSchemaWithMandatory, requiredWithoutMeta, requiredMeta)
-    val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(augmentedHadoopConf))
+      spark, dataSchema, partitionSchema, if (isIncremental) 
requiredSchemaWithMandatory else requiredSchema,
+      filters, options, augmentedHadoopConf, requiredSchemaWithMandatory, 
requiredWithoutMeta, requiredMeta)
+    val broadcastedHadoopConf = spark.sparkContext.broadcast(new 
SerializableConfiguration(augmentedHadoopConf))
+    val props: TypedProperties = HoodieFileIndex.getConfigProperties(spark, 
options)
 
     (file: PartitionedFile) => {
       file.partitionValues match {
-        case fileSliceMapping: PartitionFileSliceMapping =>
+        // Snapshot or incremental queries.
+        case fileSliceMapping: HoodiePartitionFileSliceMapping =>
           val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
           if (FSUtils.isLogFile(filePath)) {
             // TODO: Use FileGroupReader here: HUDI-6942.
@@ -106,7 +113,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
               case Some(fileSlice) =>
                 val hoodieBaseFile = fileSlice.getBaseFile.get()
                 val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
-                val partitionValues = fileSliceMapping.getInternalRow
+                val partitionValues = fileSliceMapping.getPartitionValues
                 val logFiles = getLogFilesFromSlice(fileSlice)
                 if (requiredSchemaWithMandatory.isEmpty) {
                   val baseFile = createPartitionedFile(partitionValues, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
@@ -137,12 +144,37 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
               case _ => baseFileReader(file)
             }
           }
+        // CDC queries.
+        case hoodiePartitionCDCFileGroupSliceMapping: 
HoodiePartitionCDCFileGroupMapping =>
+          val filePath: Path = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+          val fileGroupId: HoodieFileGroupId = new 
HoodieFileGroupId(filePath.getParent.toString, filePath.getName)
+          val fileSplits = 
hoodiePartitionCDCFileGroupSliceMapping.getFileSplitsFor(fileGroupId).get.toArray
+          val fileGroupSplit: HoodieCDCFileGroupSplit = 
HoodieCDCFileGroupSplit(fileSplits)
+          buildCDCRecordIterator(fileGroupSplit, preMergeBaseFileReader, 
broadcastedHadoopConf.value.value, requiredSchema, props)
         // TODO: Use FileGroupReader here: HUDI-6942.
         case _ => baseFileReader(file)
       }
     }
   }
 
+  protected def buildCDCRecordIterator(cdcFileGroupSplit: 
HoodieCDCFileGroupSplit,
+                                       preMergeBaseFileReader: PartitionedFile 
=> Iterator[InternalRow],
+                                       hadoopConf: Configuration,
+                                       requiredSchema: StructType,
+                                       props: TypedProperties): 
Iterator[InternalRow] = {
+    val metaClient = 
HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, 
tableState.tablePath, props)
+    val cdcSchema = CDCRelation.FULL_CDC_SPARK_SCHEMA
+    new CDCFileGroupIterator(
+      cdcFileGroupSplit,
+      metaClient,
+      hadoopConf,
+      preMergeBaseFileReader,
+      tableSchema,
+      cdcSchema,
+      requiredSchema,
+      props)
+  }
+
   protected def buildFileGroupIterator(preMergeBaseFileReader: PartitionedFile 
=> Iterator[InternalRow],
                                        partitionValues: InternalRow,
                                        baseFile: HoodieBaseFile,
@@ -172,8 +204,12 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     
reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala
   }
 
-  def generateRequiredSchemaWithMandatory(requiredSchema: StructType, 
dataSchema: StructType): StructType = {
-    if (!isMOR || 
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState)) {
+  def generateRequiredSchemaWithMandatory(requiredSchema: StructType,
+                                          dataSchema: StructType,
+                                          partitionSchema: StructType): 
StructType = {
+    if (isIncremental) {
+      StructType(dataSchema.toArray ++ partitionSchema.fields)
+    } else if (!isMOR || 
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState)) {
       val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
       for (field <- mandatoryFields) {
         if (requiredSchema.getFieldIndex(field).isEmpty) {
@@ -197,15 +233,14 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     PartitionedFile => Iterator[InternalRow],
     PartitionedFile => Iterator[InternalRow]) = {
 
-    //file reader when you just read a hudi parquet file and don't do any 
merging
     val baseFileReader = super.buildReaderWithPartitionValues(sparkSession, 
dataSchema, partitionSchema, requiredSchema,
-      filters, options, new Configuration(hadoopConf))
+      filters ++ requiredFilters, options, new Configuration(hadoopConf))
 
     //file reader for reading a hudi base file that needs to be merged with 
log files
     val preMergeBaseFileReader = if (isMOR) {
       // Add support for reading files using inline file system.
       super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema,
-        requiredSchemaWithMandatory, Seq.empty, options, new 
Configuration(hadoopConf))
+        requiredSchemaWithMandatory, requiredFilters, options, new 
Configuration(hadoopConf))
     } else {
       _: PartitionedFile => Iterator.empty
     }
@@ -215,7 +250,6 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     // 2. if we need to merge the bootstrap base and skeleton files then we 
cannot filter
     // 3. if we need to merge the bootstrap base and skeleton files then we 
should never append partitions to the
     //    skeleton reader
-
     val needMetaCols = requiredMeta.nonEmpty
     val needDataCols = requiredWithoutMeta.nonEmpty
 
@@ -228,7 +262,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       } else {
         // filter and append
         super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, partitionSchema,
-          requiredMeta, filters, options, new Configuration(hadoopConf))
+          requiredMeta, filters ++ requiredFilters, options, new 
Configuration(hadoopConf))
       }
     } else {
       _: PartitionedFile => Iterator.empty
@@ -248,7 +282,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       } else {
         // filter and append
         super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
-          filters, options, new Configuration(hadoopConf))
+          filters ++ requiredFilters, options, new Configuration(hadoopConf))
       }
     } else {
       _: PartitionedFile => Iterator.empty
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
index a8ba96b9b71..af3cdf715e8 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -26,7 +26,7 @@ import 
org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{BaseFile, FileSlice, HoodieLogFile, 
HoodieRecord}
 import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils, 
HoodieTableSchema, HoodieTableState, LogFileIterator, 
MergeOnReadSnapshotRelation, PartitionFileSliceMapping, 
RecordMergingFileIterator, SkipMergeIterator, SparkAdapterSupport}
+import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils, 
HoodieTableSchema, HoodieTableState, LogFileIterator, 
MergeOnReadSnapshotRelation, HoodiePartitionFileSliceMapping, 
RecordMergingFileIterator, SkipMergeIterator, SparkAdapterSupport}
 import org.apache.spark.broadcast.Broadcast
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.SparkSession
@@ -51,7 +51,10 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
                                  mergeType: String,
                                  mandatoryFields: Seq[String],
                                  isMOR: Boolean,
-                                 isBootstrap: Boolean) extends 
ParquetFileFormat with SparkAdapterSupport {
+                                 isBootstrap: Boolean,
+                                 isIncremental: Boolean,
+                                 requiredFilters: Seq[Filter]
+                                ) extends ParquetFileFormat with 
SparkAdapterSupport {
 
   override def isSplitable(sparkSession: SparkSession,
                            options: Map[String, String],
@@ -86,7 +89,9 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
 
     val outputSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
 
-    val requiredSchemaWithMandatory = if (!isMOR || 
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
+    val requiredSchemaWithMandatory = if (isIncremental) {
+      StructType(dataSchema.toArray ++ partitionSchema.fields)
+    } else if (!isMOR || 
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
       //add mandatory fields to required schema
       val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
       for (field <- mandatoryFields) {
@@ -114,13 +119,14 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
     val bootstrapBaseAppend = needDataCols && isBootstrap && !isMOR && 
partitionSchema.nonEmpty
 
     val (baseFileReader, preMergeBaseFileReader, skeletonReader, 
bootstrapBaseReader) = buildFileReaders(sparkSession,
-      dataSchema, partitionSchema, requiredSchema, filters, options, 
hadoopConf, requiredSchemaWithMandatory,
+      dataSchema, partitionSchema, if (isIncremental) 
requiredSchemaWithMandatory else requiredSchema,
+      filters, options, hadoopConf, requiredSchemaWithMandatory,
       requiredWithoutMeta, requiredMeta)
 
     val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
     (file: PartitionedFile) => {
       file.partitionValues match {
-        case fileSliceMapping: PartitionFileSliceMapping =>
+        case fileSliceMapping: HoodiePartitionFileSliceMapping =>
           val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
           if (FSUtils.isLogFile(filePath)) {
             //no base file
@@ -135,7 +141,7 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
               case Some(fileSlice) =>
                 val hoodieBaseFile = fileSlice.getBaseFile.get()
                 val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
-                val partitionValues = fileSliceMapping.getInternalRow
+                val partitionValues = fileSliceMapping.getPartitionValues
                 val logFiles = getLogFilesFromSlice(fileSlice)
                 if (requiredSchemaWithMandatory.isEmpty) {
                   val baseFile = createPartitionedFile(partitionValues, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
@@ -184,14 +190,13 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
     PartitionedFile => Iterator[InternalRow]) = {
 
     //file reader when you just read a hudi parquet file and don't do any 
merging
-
     val baseFileReader = super.buildReaderWithPartitionValues(sparkSession, 
dataSchema, partitionSchema, requiredSchema,
-      filters, options, new Configuration(hadoopConf))
+      filters ++ requiredFilters, options, new Configuration(hadoopConf))
 
     //file reader for reading a hudi base file that needs to be merged with 
log files
     val preMergeBaseFileReader = if (isMOR) {
       super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
StructType(Seq.empty),
-        requiredSchemaWithMandatory, Seq.empty, options, new 
Configuration(hadoopConf))
+        requiredSchemaWithMandatory, requiredFilters, options, new 
Configuration(hadoopConf))
     } else {
       _: PartitionedFile => Iterator.empty
     }
@@ -234,7 +239,7 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
       } else {
         // filter and append
         super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
-          filters, options, new Configuration(hadoopConf))
+          filters ++ requiredFilters, options, new Configuration(hadoopConf))
       }
     } else {
       _: PartitionedFile => Iterator.empty

Reply via email to