This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c0c402a9a03 refactor: Use HoodieFileGroupReader paths for all Spark 
Datasource reads (#17457)
0c0c402a9a03 is described below

commit 0c0c402a9a03061155b5c43f1f4cdd37726eacfd
Author: Tim Brown <[email protected]>
AuthorDate: Sun Dec 7 17:05:51 2025 -0500

    refactor: Use HoodieFileGroupReader paths for all Spark Datasource reads 
(#17457)
---
 .../main/scala/org/apache/hudi/DefaultSource.scala |  34 +-
 .../org/apache/hudi/HoodieBootstrapMORRDD.scala    |  88 ---
 .../apache/hudi/HoodieBootstrapMORRelation.scala   | 101 ----
 .../scala/org/apache/hudi/HoodieBootstrapRDD.scala | 130 -----
 .../org/apache/hudi/HoodieBootstrapRelation.scala  | 266 ---------
 .../hudi/HoodieHadoopFsRelationFactory.scala       |   5 +-
 .../org/apache/hudi/HoodieMergeOnReadRDDV1.scala   | 166 ------
 .../org/apache/hudi/HoodieMergeOnReadRDDV2.scala   |  12 +-
 .../src/main/scala/org/apache/hudi/Iterators.scala | 405 --------------
 .../hudi/MergeOnReadIncrementalRelationV1.scala    |  10 +-
 .../org/apache/hudi/cdc/CDCFileGroupIterator.scala |  26 +-
 .../scala/org/apache/hudi/cdc/CDCRelation.scala    | 217 --------
 .../apache/hudi/{ => cdc}/HoodieCDCFileIndex.scala |  62 ++-
 .../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala   | 602 ---------------------
 .../sql/FileFormatUtilsForFileGroupReader.scala    |   7 +-
 .../HoodieFileGroupReaderBasedFileFormat.scala     |   4 +-
 .../sql/hudi/streaming/HoodieStreamSourceV1.scala  |  27 +-
 .../sql/hudi/streaming/HoodieStreamSourceV2.scala  |  27 +-
 18 files changed, 134 insertions(+), 2055 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 1520cd1bd34d..363aa116f7fa 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -19,23 +19,21 @@ package org.apache.hudi
 
 import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
-import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.cdc.HoodieCDCFileIndex
 import org.apache.hudi.common.HoodieSchemaNotFoundException
-import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.common.config.HoodieStorageConfig
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
 import org.apache.hudi.common.model.WriteConcurrencyMode
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion, TableSchemaResolver}
-import org.apache.hudi.common.table.log.InstantRange.RangeType
 import org.apache.hudi.common.util.{ConfigUtils, TablePathUtils}
 import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
 import org.apache.hudi.config.HoodieWriteConfig.{WRITE_CONCURRENCY_MODE, 
WRITE_TABLE_VERSION}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.io.storage.HoodieSparkIOFactory
 import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
-import org.apache.hudi.util.{SparkConfigUtils}
+import org.apache.hudi.util.SparkConfigUtils
 
 import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
@@ -304,16 +302,12 @@ object DefaultSource {
       if 
(metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) {
         new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters, 
Some(schema)))
       } else if (isCdcQuery) {
-        if (isNotMetadataTable) {
-          if (tableType == COPY_ON_WRITE) {
-            new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
-              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
-          } else {
-            new HoodieMergeOnReadCDCHadoopFsRelationFactory(
-              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
-          }
+        if (tableType == COPY_ON_WRITE) {
+          new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
+            sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
         } else {
-          CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
+          new HoodieMergeOnReadCDCHadoopFsRelationFactory(
+            sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
         }
       } else {
 
@@ -345,12 +339,8 @@ object DefaultSource {
             }
 
           case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
-            if (isNotMetadataTable) {
-              new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
-                sqlContext, metaClient, parameters, userSchema, isBootstrap = 
true).build()
-            } else {
-              HoodieBootstrapMORRelation(sqlContext, userSchema, metaClient, 
parameters)
-            }
+            new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
true).build()
 
           case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
             if (hoodieTableSupportsCompletionTime) {
@@ -394,11 +384,11 @@ object DefaultSource {
   private def resolveSchema(metaClient: HoodieTableMetaClient,
                             parameters: Map[String, String],
                             schema: Option[StructType]): StructType = {
-    val isCdcQuery = CDCRelation.isCDCEnabled(metaClient) &&
+    val isCdcQuery = HoodieCDCFileIndex.isCDCEnabled(metaClient) &&
       parameters.get(QUERY_TYPE.key).contains(QUERY_TYPE_INCREMENTAL_OPT_VAL) 
&&
       
parameters.get(INCREMENTAL_FORMAT.key).contains(INCREMENTAL_FORMAT_CDC_VAL)
     if (isCdcQuery) {
-      CDCRelation.FULL_CDC_SPARK_SCHEMA
+      HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA
     } else {
       val schemaResolver = new TableSchemaResolver(metaClient)
       try {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRDD.scala
deleted file mode 100644
index f298ca849107..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRDD.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.HoodieBaseRelation.BaseFileReader
-import org.apache.hudi.HoodieBootstrapMORRDD.{getPartitionPath, 
CONFIG_INSTANTIATION_LOCK}
-import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
-import org.apache.hudi.storage.StoragePath
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.spark.{Partition, SerializableWritable, TaskContext}
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-
-class HoodieBootstrapMORRDD(@transient spark: SparkSession,
-                            @transient config: Configuration,
-                            bootstrapDataFileReader: BaseFileReader,
-                            bootstrapSkeletonFileReader: BaseFileReader,
-                            regularFileReader: BaseFileReader,
-                            tableSchema: HoodieTableSchema,
-                            requiredSchema: HoodieTableSchema,
-                            tableState: HoodieTableState,
-                            @transient splits: Seq[BaseHoodieBootstrapSplit])
-  extends HoodieBootstrapRDD(spark, bootstrapDataFileReader, 
bootstrapSkeletonFileReader,
-    regularFileReader, requiredSchema, splits) {
-
-  protected val maxCompactionMemoryInBytes: Long = 
getMaxCompactionMemoryInBytes(new JobConf(config))
-
-  private val hadoopConfBroadcast = spark.sparkContext.broadcast(new 
SerializableWritable(config))
-
-  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
-    val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition]
-    maybeLog(bootstrapPartition)
-    val bootstrapMORSplit = 
bootstrapPartition.split.asInstanceOf[HoodieBootstrapMORSplit]
-
-    if (bootstrapMORSplit.logFiles.isEmpty) {
-      //no log files, treat like regular bootstrap
-      getIterator(bootstrapPartition)
-    } else {
-      bootstrapMORSplit.skeletonFile match {
-        case Some(skeletonFile) =>
-          val (iterator, schema) = 
getSkeletonIteratorSchema(bootstrapMORSplit.dataFile, skeletonFile)
-          new RecordMergingFileIterator(bootstrapMORSplit.logFiles, 
getPartitionPath(skeletonFile),
-            iterator, schema, tableSchema, requiredSchema, tableState, 
getHadoopConf)
-        case _ =>
-          // NOTE: Regular file-reader is already projected into the required 
schema
-          new RecordMergingFileIterator(bootstrapMORSplit.logFiles,
-            getPartitionPath(bootstrapMORSplit.dataFile),
-            regularFileReader.read(bootstrapMORSplit.dataFile), 
regularFileReader.schema, tableSchema,
-            requiredSchema, tableState, getHadoopConf)
-      }
-    }
-  }
-
-  private def getHadoopConf: Configuration = {
-    val conf = hadoopConfBroadcast.value.value
-    // TODO clean up, this lock is unnecessary see HoodieMergeOnReadRDD
-    CONFIG_INSTANTIATION_LOCK.synchronized {
-      new Configuration(conf)
-    }
-  }
-}
-
-object HoodieBootstrapMORRDD extends SparkAdapterSupport {
-  val CONFIG_INSTANTIATION_LOCK = new Object()
-
-  def getPartitionPath(file: PartitionedFile): StoragePath = {
-    
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file).getParent
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala
deleted file mode 100644
index 60ae8b722478..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
-import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.storage.StoragePath
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
-
-import scala.collection.JavaConverters._
-
-case class HoodieBootstrapMORSplit(dataFile: PartitionedFile, skeletonFile: 
Option[PartitionedFile],
-                                   logFiles: List[HoodieLogFile]) extends 
BaseHoodieBootstrapSplit
-
-/**
- * This is Spark relation that can be used for querying metadata/fully 
bootstrapped query hoodie tables, as well as
- * non-bootstrapped tables. It implements PrunedFilteredScan interface in 
order to support column pruning and filter
- * push-down. For metadata bootstrapped files, if we query columns from both 
metadata and actual data then it will
- * perform a merge of both to return the result.
- *
- * Caveat: Filter push-down does not work when querying both metadata and 
actual data columns over metadata
- * bootstrapped files, because then the metadata file and data file can return 
different number of rows causing errors
- * merging.
- *
- * @param sqlContext Spark SQL Context
- * @param userSchema User specified schema in the datasource query
- * @param globPaths  The global paths to query. If it not none, read from the 
globPaths,
- *                   else read data from tablePath using HoodieFileIndex.
- * @param metaClient Hoodie table meta client
- * @param optParams  DataSource options passed by the user
- */
-case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext,
-                                      private val userSchema: 
Option[StructType],
-                                      override val metaClient: 
HoodieTableMetaClient,
-                                      override val optParams: Map[String, 
String],
-                                      private val prunedDataSchema: 
Option[StructType] = None)
-  extends BaseHoodieBootstrapRelation(sqlContext, userSchema, metaClient,
-    optParams, prunedDataSchema) {
-
-  override type Relation = HoodieBootstrapMORRelation
-
-  protected lazy val mandatoryFieldsForMerging: Seq[String] =
-    Seq(recordKeyField) ++ orderingFields
-
-  override lazy val mandatoryFields: Seq[String] = mandatoryFieldsForMerging
-
-  protected override def getFileSlices(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): Seq[FileSlice] = {
-    fileIndex.listFileSlices(HoodieFileIndex.
-      convertFilterForTimestampKeyGenerator(metaClient, 
partitionFilters)).values.flatten.toSeq
-  }
-
-  protected override def createFileSplit(fileSlice: FileSlice, dataFile: 
PartitionedFile, skeletonFile: Option[PartitionedFile]): FileSplit = {
-    HoodieBootstrapMORSplit(dataFile, skeletonFile, 
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList)
-  }
-
-  protected override def composeRDD(fileSplits: Seq[FileSplit],
-                                    tableSchema: HoodieTableSchema,
-                                    requiredSchema: HoodieTableSchema,
-                                    requestedColumns: Array[String],
-                                    filters: Array[Filter]): RDD[InternalRow] 
= {
-
-    val (bootstrapDataFileReader, bootstrapSkeletonFileReader, 
regularFileReader) = getFileReaders(tableSchema,
-      requiredSchema, requestedColumns, filters)
-    new HoodieBootstrapMORRDD(
-      sqlContext.sparkSession,
-      config = jobConf,
-      bootstrapDataFileReader = bootstrapDataFileReader,
-      bootstrapSkeletonFileReader = bootstrapSkeletonFileReader,
-      regularFileReader = regularFileReader,
-      tableSchema = tableSchema,
-      requiredSchema = requiredSchema,
-      tableState = tableState, fileSplits)
-  }
-
-
-  override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
-    this.copy(prunedDataSchema = Some(prunedSchema))
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
deleted file mode 100644
index fcac20bf9b97..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.HoodieBaseRelation.BaseFileReader
-import org.apache.hudi.common.util.ValidationUtils.checkState
-
-import org.apache.spark.{Partition, TaskContext}
-import org.apache.spark.rdd.RDD
-import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.types.StructType
-
-class HoodieBootstrapRDD(@transient spark: SparkSession,
-                         bootstrapDataFileReader: BaseFileReader,
-                         bootstrapSkeletonFileReader: BaseFileReader,
-                         regularFileReader: BaseFileReader,
-                         requiredSchema: HoodieTableSchema,
-                         @transient splits: Seq[BaseHoodieBootstrapSplit])
-  extends RDD[InternalRow](spark.sparkContext, Nil) {
-
-
-  protected def getSkeletonIteratorSchema(dataFile: PartitionedFile, 
skeletonFile: PartitionedFile): (Iterator[InternalRow], StructType) = {
-    if (bootstrapDataFileReader.schema.isEmpty) {
-      // No data column to fetch, hence fetch only from skeleton file
-      (bootstrapSkeletonFileReader.read(skeletonFile), 
bootstrapSkeletonFileReader.schema)
-    } else if (bootstrapSkeletonFileReader.schema.isEmpty) {
-      // No metadata column to fetch, hence fetch only from data file
-      (bootstrapDataFileReader.read(dataFile), bootstrapDataFileReader.schema)
-    } else {
-      // Fetch from both data and skeleton file, and merge
-      val dataFileIterator = bootstrapDataFileReader.read(dataFile)
-      val skeletonFileIterator = bootstrapSkeletonFileReader.read(skeletonFile)
-      val mergedSchema = StructType(bootstrapSkeletonFileReader.schema.fields 
++ bootstrapDataFileReader.schema.fields)
-
-      (merge(skeletonFileIterator, dataFileIterator), mergedSchema)
-    }
-  }
-
-  /**
-   *  Here we have to project the [[InternalRow]]s fetched into the expected 
target schema.
-   *  These could diverge for ex, when requested schema contains partition 
columns which might not be
-   *  persisted w/in the data file, but instead would be parsed from the 
partition path. In that case
-   *  output of the file-reader will have different ordering of the fields 
than the original required
-   *  schema (for more details please check out [[ParquetFileFormat]] 
implementation).
-   */
-  protected def unsafeProjectIterator(iterator: Iterator[InternalRow], schema: 
StructType): Iterator[InternalRow] = {
-    val unsafeProjection = generateUnsafeProjection(schema, 
requiredSchema.structTypeSchema)
-    iterator.map(unsafeProjection)
-  }
-
-  protected def maybeLog(bootstrapPartition: HoodieBootstrapPartition): Unit = 
{
-    if (log.isDebugEnabled) {
-      var msg = "Got Split => Index: " + bootstrapPartition.index + ", Data 
File: " +
-        bootstrapPartition.split.dataFile.filePath
-      if (bootstrapPartition.split.skeletonFile.isDefined) {
-        msg += ", Skeleton File: " + 
bootstrapPartition.split.skeletonFile.get.filePath
-      }
-      logDebug(msg)
-    }
-  }
-
-  protected def getIterator(bootstrapPartition: HoodieBootstrapPartition): 
Iterator[InternalRow] = {
-    bootstrapPartition.split.skeletonFile match {
-      case Some(skeletonFile) =>
-        // It is a bootstrap split. Check both skeleton and data files.
-        val (iterator, schema) = 
getSkeletonIteratorSchema(bootstrapPartition.split.dataFile, skeletonFile)
-        unsafeProjectIterator(iterator, schema)
-      case _ =>
-        // NOTE: Regular file-reader is already projected into the required 
schema
-        regularFileReader.read(bootstrapPartition.split.dataFile)
-    }
-  }
-
-  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
-    val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition]
-    maybeLog(bootstrapPartition)
-    getIterator(bootstrapPartition)
-  }
-
-  def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: 
Iterator[InternalRow]): Iterator[InternalRow] = {
-    new Iterator[InternalRow] {
-      private val combinedRow = new JoinedRow()
-
-      override def hasNext: Boolean = {
-        checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
-          "Bootstrap data-file iterator and skeleton-file iterator have to be 
in-sync!")
-        dataFileIterator.hasNext && skeletonFileIterator.hasNext
-      }
-
-      override def next(): InternalRow = {
-        combinedRow(skeletonFileIterator.next(), dataFileIterator.next())
-      }
-    }
-  }
-
-  override protected def getPartitions: Array[Partition] = {
-    splits.zipWithIndex.map(file => {
-      if (file._1.skeletonFile.isDefined) {
-        logDebug("Forming partition with => Index: " + file._2 + ", Files: " + 
file._1.dataFile.filePath
-          + "," + file._1.skeletonFile.get.filePath)
-        HoodieBootstrapPartition(file._2, file._1)
-      } else {
-        logDebug("Forming partition with => Index: " + file._2 + ", File: " + 
file._1.dataFile.filePath)
-        HoodieBootstrapPartition(file._2, file._1)
-      }
-    }).toArray
-  }
-}
-
-case class HoodieBootstrapPartition(index: Int, split: 
BaseHoodieBootstrapSplit) extends Partition
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
deleted file mode 100644
index 99d43508d7f2..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, projectReader, 
BaseFileReader}
-import org.apache.hudi.HoodieBootstrapRelation.{createPartitionedFile, 
validate}
-import org.apache.hudi.common.model.FileSlice
-import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.storage.StoragePath
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
PartitionedFile}
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
-
-trait BaseHoodieBootstrapSplit extends HoodieFileSplit {
-  val dataFile: PartitionedFile
-  val skeletonFile: Option[PartitionedFile]
-}
-
-case class HoodieBootstrapSplit(dataFile: PartitionedFile,
-                                skeletonFile: Option[PartitionedFile]) extends 
BaseHoodieBootstrapSplit
-
-case class HoodieBootstrapRelation(override val sqlContext: SQLContext,
-                                   private val userSchema: Option[StructType],
-                                   override val metaClient: 
HoodieTableMetaClient,
-                                   override val optParams: Map[String, String],
-                                   private val prunedDataSchema: 
Option[StructType] = None)
-  extends BaseHoodieBootstrapRelation(sqlContext, userSchema, metaClient, 
optParams, prunedDataSchema) {
-
-  override type Relation = HoodieBootstrapRelation
-
-  override protected def createFileSplit(fileSlice: FileSlice, dataFile: 
PartitionedFile, skeletonFile: Option[PartitionedFile]): FileSplit = {
-    HoodieBootstrapSplit(dataFile, skeletonFile)
-  }
-
-  override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
-    this.copy(prunedDataSchema = Some(prunedSchema))
-
-  def toHadoopFsRelation: HadoopFsRelation = {
-    HadoopFsRelation(
-      location = fileIndex,
-      partitionSchema = fileIndex.partitionSchema,
-      dataSchema = fileIndex.dataSchema,
-      bucketSpec = None,
-      fileFormat = fileFormat,
-      optParams)(sparkSession)
-  }
-}
-/**
-  * This is Spark relation that can be used for querying metadata/fully 
bootstrapped query hoodie tables, as well as
- * non-bootstrapped tables. It implements PrunedFilteredScan interface in 
order to support column pruning and filter
- * push-down. For metadata bootstrapped files, if we query columns from both 
metadata and actual data then it will
- * perform a merge of both to return the result.
- *
- * Caveat: Filter push-down does not work when querying both metadata and 
actual data columns over metadata
- * bootstrapped files, because then the metadata file and data file can return 
different number of rows causing errors
- * merging.
- *
- * @param sqlContext Spark SQL Context
- * @param userSchema User specified schema in the datasource query
- * @param globPaths  The global paths to query. If it not none, read from the 
globPaths,
- *                   else read data from tablePath using HoodieFileIndex.
- * @param metaClient Hoodie table meta client
- * @param optParams  DataSource options passed by the user
- */
-abstract class BaseHoodieBootstrapRelation(override val sqlContext: SQLContext,
-                                           private val userSchema: 
Option[StructType],
-                                           override val metaClient: 
HoodieTableMetaClient,
-                                           override val optParams: Map[String, 
String],
-                                           private val prunedDataSchema: 
Option[StructType] = None)
-  extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, 
prunedDataSchema) {
-
-  override type FileSplit = BaseHoodieBootstrapSplit
-
-  private lazy val skeletonSchema = HoodieSparkUtils.getMetaSchema
-
-  override lazy val mandatoryFields: Seq[String] = Seq.empty
-
-  protected def getFileSlices(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[FileSlice] = {
-    listLatestFileSlices(partitionFilters, dataFilters)
-  }
-
-  protected def createFileSplit(fileSlice: FileSlice, dataFile: 
PartitionedFile, skeletonFile: Option[PartitionedFile]): FileSplit
-
-  protected override def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): Seq[FileSplit] = {
-    val fileSlices = getFileSlices(partitionFilters, dataFilters)
-    val isPartitioned = metaClient.getTableConfig.isTablePartitioned
-    fileSlices.map { fileSlice =>
-      val baseFile = fileSlice.getBaseFile.get()
-      if (baseFile.getBootstrapBaseFile.isPresent) {
-        val partitionValues = 
getPartitionColumnsAsInternalRowInternal(baseFile.getPathInfo,
-          metaClient.getBasePath, extractPartitionValuesFromPartitionPath = 
isPartitioned)
-        val dataFile = createPartitionedFile(
-          partitionValues, 
baseFile.getBootstrapBaseFile.get.getPathInfo.getPath,
-          0, baseFile.getBootstrapBaseFile.get().getFileLen)
-        val skeletonFile = Option(createPartitionedFile(
-          InternalRow.empty, baseFile.getStoragePath, 0, baseFile.getFileLen))
-
-        createFileSplit(fileSlice, dataFile, skeletonFile)
-      } else {
-        val dataFile = createPartitionedFile(
-          getPartitionColumnsAsInternalRow(baseFile.getPathInfo), 
baseFile.getStoragePath, 0, baseFile.getFileLen)
-        createFileSplit(fileSlice, dataFile, Option.empty)
-      }
-    }
-  }
-
-  /**
-   * get all the file readers required for composeRDD
-   */
-  protected def getFileReaders(tableSchema: HoodieTableSchema,
-                               requiredSchema: HoodieTableSchema,
-                               requestedColumns: Array[String],
-                               filters: Array[Filter]): (BaseFileReader, 
BaseFileReader, BaseFileReader) = {
-    val requiredSkeletonFileSchema =
-      StructType(skeletonSchema.filter(f => requestedColumns.exists(col => 
resolver(f.name, col))))
-
-    val (bootstrapDataFileReader, bootstrapSkeletonFileReader) =
-      createBootstrapFileReaders(tableSchema, requiredSchema, 
requiredSkeletonFileSchema, filters)
-
-    val regularFileReader = createRegularFileReader(tableSchema, 
requiredSchema, filters)
-    (bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader)
-  }
-
-  protected override def composeRDD(fileSplits: Seq[FileSplit],
-                                    tableSchema: HoodieTableSchema,
-                                    requiredSchema: HoodieTableSchema,
-                                    requestedColumns: Array[String],
-                                    filters: Array[Filter]): RDD[InternalRow] 
= {
-
-    val (bootstrapDataFileReader, bootstrapSkeletonFileReader, 
regularFileReader) = getFileReaders(tableSchema,
-      requiredSchema, requestedColumns, filters)
-    new HoodieBootstrapRDD(sqlContext.sparkSession, bootstrapDataFileReader, 
bootstrapSkeletonFileReader, regularFileReader,
-      requiredSchema, fileSplits)
-  }
-
-  /**
-   * Creates skeleton and base file reader
-   */
-  private def createBootstrapFileReaders(tableSchema: HoodieTableSchema,
-                                         requiredSchema: HoodieTableSchema,
-                                         requiredSkeletonFileSchema: 
StructType,
-                                         filters: Array[Filter]): 
(BaseFileReader, BaseFileReader) = {
-    // NOTE: "Data" schema in here refers to the whole table's schema that 
doesn't include only partition
-    //       columns, as opposed to data file schema not including any 
meta-fields columns in case of
-    //       Bootstrap relation
-    val (partitionSchema, dataSchema, requiredDataSchema) =
-      tryPrunePartitionColumnsInternal(tableSchema, requiredSchema, 
extractPartitionValuesFromPartitionPath = true)
-
-    val bootstrapDataFileSchema = 
StructType(dataSchema.structTypeSchema.filterNot(sf => isMetaField(sf.name)))
-    val requiredBootstrapDataFileSchema = 
StructType(requiredDataSchema.structTypeSchema.filterNot(sf => 
isMetaField(sf.name)))
-
-    validate(requiredDataSchema, requiredBootstrapDataFileSchema, 
requiredSkeletonFileSchema)
-
-    val bootstrapDataFileReader = createBaseFileReader(
-      spark = sqlContext.sparkSession,
-      dataSchema = new HoodieTableSchema(bootstrapDataFileSchema, 
convertToAvroSchema(bootstrapDataFileSchema, tableName).toString),
-      partitionSchema = partitionSchema,
-      requiredDataSchema = new 
HoodieTableSchema(requiredBootstrapDataFileSchema, 
convertToAvroSchema(requiredBootstrapDataFileSchema, tableName).toString),
-      // NOTE: For bootstrapped files we can't apply any filtering in case 
we'd need to merge it with
-      //       a skeleton-file as we rely on matching ordering of the records 
across bootstrap- and skeleton-files
-      filters = if (requiredSkeletonFileSchema.isEmpty) filters else Seq(),
-      options = optParams,
-      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf(),
-      // NOTE: Bootstrap relation have to always extract partition values from 
the partition-path as this is a
-      //       default Spark behavior: Spark by default strips 
partition-columns from the data schema and does
-      //       NOT persist them in the data files, instead parsing them from 
partition-paths (on the fly) whenever
-      //       table is queried
-      shouldAppendPartitionValuesOverride = Some(true)
-    )
-
-    val boostrapSkeletonFileReader = createBaseFileReader(
-      spark = sqlContext.sparkSession,
-      dataSchema = new HoodieTableSchema(skeletonSchema, 
convertToAvroSchema(skeletonSchema, tableName).toString),
-      // NOTE: Here we specify partition-schema as empty since we don't need 
Spark to inject partition-values
-      //       parsed from the partition-path
-      partitionSchema = StructType(Seq.empty),
-      requiredDataSchema = new HoodieTableSchema(requiredSkeletonFileSchema, 
convertToAvroSchema(requiredSkeletonFileSchema, tableName).toString),
-      // NOTE: For bootstrapped files we can't apply any filtering in case 
we'd need to merge it with
-      //       a skeleton-file as we rely on matching ordering of the records 
across bootstrap- and skeleton-files
-      filters = if (requiredBootstrapDataFileSchema.isEmpty) filters else 
Seq(),
-      options = optParams,
-      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf(),
-      // NOTE: We override Spark to avoid injecting partition values into the 
records read from
-      //       skeleton-file
-      shouldAppendPartitionValuesOverride = Some(false)
-    )
-
-    (bootstrapDataFileReader, boostrapSkeletonFileReader)
-  }
-
-  /**
-   * create reader for hudi base files
-   */
-  private def createRegularFileReader(tableSchema: HoodieTableSchema,
-                                     requiredSchema: HoodieTableSchema,
-                                     filters: Array[Filter]): BaseFileReader = 
{
-    // NOTE: "Data" schema in here refers to the whole table's schema that 
doesn't include only partition
-    //       columns, as opposed to data file schema not including any 
meta-fields columns in case of
-    //       Bootstrap relation
-    val (partitionSchema, dataSchema, requiredDataSchema) =
-      tryPrunePartitionColumns(tableSchema, requiredSchema)
-
-    // NOTE: Bootstrapped table allows Hudi created file-slices to be 
co-located w/ the "bootstrapped"
-    //       ones (ie persisted by Spark). Therefore to be able to read the 
data from Bootstrapped
-    //       table we also need to create regular file-reader to read 
file-slices created by Hudi
-    val regularFileReader = createBaseFileReader(
-      spark = sqlContext.sparkSession,
-      dataSchema = dataSchema,
-      partitionSchema = partitionSchema,
-      requiredDataSchema = requiredDataSchema,
-      filters = filters,
-      options = optParams,
-      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
-    )
-
-    // NOTE: In some case schema of the reader's output (reader's schema) 
might not match the schema expected by the caller.
-    //       This could occur for ex, when requested schema contains partition 
columns which might not be persisted w/in the
-    //       data file, but instead would be parsed from the partition path. 
In that case output of the file-reader will have
-    //       different ordering of the fields than the original required 
schema (for more details please check out
-    //       [[ParquetFileFormat]] impl). In that case we have to project the 
rows from the file-reader's schema
-    //       back into the one expected by the caller
-    projectReader(regularFileReader, requiredSchema.structTypeSchema)
-  }
-}
-
-object HoodieBootstrapRelation extends SparkAdapterSupport {
-  def validate(requiredDataSchema: HoodieTableSchema, requiredDataFileSchema: 
StructType, requiredSkeletonFileSchema: StructType): Unit = {
-    val requiredDataColumns: Seq[String] = 
requiredDataSchema.structTypeSchema.fieldNames.toSeq
-    val combinedColumns = (requiredSkeletonFileSchema.fieldNames ++ 
requiredDataFileSchema.fieldNames).toSeq
-
-    // NOTE: Here we validate that all required data columns are covered by 
the combination of the columns
-    //       from both skeleton file and the corresponding data file
-    checkState(combinedColumns.sorted == requiredDataColumns.sorted)
-  }
-
-  def createPartitionedFile(partitionValues: InternalRow,
-                            filePath: StoragePath,
-                            start: Long,
-                            length: Long): PartitionedFile = {
-    sparkAdapter.getSparkPartitionedFileUtils.createPartitionedFile(
-      partitionValues, filePath, start, length)
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index bad337c18527..de6341c8caef 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -20,6 +20,7 @@ package org.apache.hudi
 
 import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, 
isSchemaEvolutionEnabledOnRead}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.cdc.HoodieCDCFileIndex
 import org.apache.hudi.common.config.HoodieReaderConfig
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
@@ -348,7 +349,7 @@ class HoodieMergeOnReadCDCHadoopFsRelationFactory(override 
val sqlContext: SQLCo
 
   override def buildFileIndex(): HoodieFileIndex = hoodieCDCFileIndex
 
-  override def buildDataSchema(): StructType = 
hoodieCDCFileIndex.cdcRelation.schema
+  override def buildDataSchema(): StructType = 
HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA
 
   override def buildPartitionSchema(): StructType = StructType(Nil)
 
@@ -448,7 +449,7 @@ class HoodieCopyOnWriteCDCHadoopFsRelationFactory(override 
val sqlContext: SQLCo
     sparkSession, metaClient, schemaSpec, options, fileStatusCache, false, 
rangeType)
   override def buildFileIndex(): HoodieFileIndex = hoodieCDCFileIndex
 
-  override def buildDataSchema(): StructType = 
hoodieCDCFileIndex.cdcRelation.schema
+  override def buildDataSchema(): StructType = 
HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA
 
   override def buildPartitionSchema(): StructType = StructType(Nil)
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV1.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV1.scala
deleted file mode 100644
index 872c3fe32991..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV1.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.HoodieBaseRelation.{projectReader, BaseFileReader}
-import org.apache.hudi.HoodieMergeOnReadRDDV1.CONFIG_INSTANTIATION_LOCK
-import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible
-import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.util.StringUtils
-import org.apache.hudi.exception.HoodieException
-import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.spark.{Partition, SerializableWritable, SparkContext, 
TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-
-import java.io.Closeable
-import java.util.function.Predicate
-
-/**
- * RDD enabling Hudi's Merge-on-Read (MOR) semantic
- *
- * @param sc               spark's context
- * @param config           hadoop configuration
- * @param fileReaders      suite of base file readers
- * @param tableSchema      table's full schema
- * @param requiredSchema   expected (potentially) projected schema
- * @param tableState       table's state
- * @param mergeType        type of merge performed
- * @param fileSplits       target file-splits this RDD will be iterating over
- * @param includeStartTime whether to include the commit with the commitTime
- * @param startTimestamp   start timestamp to filter records
- * @param endTimestamp     end timestamp to filter records
- */
-class HoodieMergeOnReadRDDV1(@transient sc: SparkContext,
-                             @transient config: Configuration,
-                             fileReaders: HoodieMergeOnReadBaseFileReaders,
-                             tableSchema: HoodieTableSchema,
-                             requiredSchema: HoodieTableSchema,
-                             tableState: HoodieTableState,
-                             mergeType: String,
-                             @transient fileSplits: 
Seq[HoodieMergeOnReadFileSplit],
-                             includeStartTime: Boolean = false,
-                             startTimestamp: String = null,
-                             endTimestamp: String = null)
-  extends RDD[InternalRow](sc, Nil) with HoodieUnsafeRDD {
-
-  protected val maxCompactionMemoryInBytes: Long = 
getMaxCompactionMemoryInBytes(new JobConf(config))
-
-  private val hadoopConfBroadcast = sc.broadcast(new 
SerializableWritable(config))
-
-  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
-    val partition = split.asInstanceOf[HoodieMergeOnReadPartition]
-    val iter = partition.split match {
-      case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
-        val projectedReader = 
projectReader(fileReaders.requiredSchemaReaderSkipMerging, 
requiredSchema.structTypeSchema)
-        projectedReader(dataFileOnlySplit.dataFile.get)
-
-      case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
-        new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, 
tableState, getHadoopConf)
-
-      case split =>
-        mergeType match {
-          case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
-            val reader = fileReaders.requiredSchemaReaderSkipMerging
-            new SkipMergeIterator(split, reader, tableSchema, requiredSchema, 
tableState, getHadoopConf)
-
-          case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
-            val reader = pickBaseFileReader()
-            new RecordMergingFileIterator(split, reader, tableSchema, 
requiredSchema, tableState, getHadoopConf)
-
-          case _ => throw new UnsupportedOperationException(s"Not supported 
merge type ($mergeType)")
-        }
-
-      case _ => throw new HoodieException(s"Unable to select an Iterator to 
read the Hoodie MOR File Split for " +
-        s"file path: ${partition.split.dataFile.get.filePath}" +
-        s"log paths: ${partition.split.logFiles.toString}" +
-        s"hoodie table path: ${tableState.tablePath}" +
-        s"spark partition Index: ${partition.index}" +
-        s"merge type: ${mergeType}")
-    }
-
-    if (iter.isInstanceOf[Closeable]) {
-      // register a callback to close logScanner which will be executed on 
task completion.
-      // when tasks finished, this method will be called, and release 
resources.
-      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => 
iter.asInstanceOf[Closeable].close()))
-    }
-
-    val commitTimeMetadataFieldIdx = 
requiredSchema.structTypeSchema.fieldNames.indexOf(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
-    val needsFiltering = commitTimeMetadataFieldIdx >= 0 && 
!StringUtils.isNullOrEmpty(startTimestamp) && 
!StringUtils.isNullOrEmpty(endTimestamp)
-    if (needsFiltering) {
-      val filterT: Predicate[InternalRow] = 
getCommitTimeFilter(includeStartTime, commitTimeMetadataFieldIdx)
-      iter.filter(filterT.test)
-    }
-    else {
-      iter
-    }
-  }
-
-  private def getCommitTimeFilter(includeStartTime: Boolean, 
commitTimeMetadataFieldIdx: Int): Predicate[InternalRow] = {
-    if (includeStartTime) {
-      new Predicate[InternalRow] {
-        override def test(row: InternalRow): Boolean = {
-          val commitTime = row.getString(commitTimeMetadataFieldIdx)
-          commitTime >= startTimestamp && commitTime <= endTimestamp
-        }
-      }
-    } else {
-      new Predicate[InternalRow] {
-        override def test(row: InternalRow): Boolean = {
-          val commitTime = row.getString(commitTimeMetadataFieldIdx)
-          commitTime > startTimestamp && commitTime <= endTimestamp
-        }
-      }
-    }
-  }
-
-  private def pickBaseFileReader(): BaseFileReader = {
-    // NOTE: This is an optimization making sure that even for MOR tables we 
fetch absolute minimum
-    //       of the stored data possible, while still properly executing 
corresponding relation's semantic
-    //       and meet the query's requirements.
-    //
-    //       Here we assume that iff queried table does use one of the 
standard (and whitelisted)
-    //       Record Payload classes then we can avoid reading and parsing the 
records w/ _full_ schema,
-    //       and instead only rely on projected one, nevertheless being able 
to perform merging correctly
-    if (isProjectionCompatible(tableState)) {
-      fileReaders.requiredSchemaReader
-    } else {
-      fileReaders.fullSchemaReader
-    }
-  }
-
-  override protected def getPartitions: Array[Partition] =
-    fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2, 
file._1)).toArray
-
-  private def getHadoopConf: Configuration = {
-    val conf = hadoopConfBroadcast.value.value
-    // TODO clean up, this lock is unnecessary
-    CONFIG_INSTANTIATION_LOCK.synchronized {
-      new Configuration(conf)
-    }
-  }
-}
-
-object HoodieMergeOnReadRDDV1 {
-  val CONFIG_INSTANTIATION_LOCK = new Object()
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
index 0b86ac6e3bf9..a1523a4f84e9 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
@@ -20,7 +20,6 @@ package org.apache.hudi
 
 import org.apache.hudi.HoodieBaseRelation.{projectReader, BaseFileReader}
 import org.apache.hudi.HoodieMergeOnReadRDDV2.CONFIG_INSTANTIATION_LOCK
-import org.apache.hudi.LogFileIterator.getPartitionPath
 import org.apache.hudi.avro.HoodieAvroReaderContext
 import org.apache.hudi.common.config.{HoodieReaderConfig, TypedProperties}
 import org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE
@@ -36,6 +35,7 @@ import org.apache.hudi.expression.{Predicate => HPredicate}
 import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
 import 
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
 import org.apache.hudi.metadata.HoodieTableMetadataUtil
+import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
 
 import org.apache.avro.Schema
@@ -255,6 +255,16 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
       override def close(): Unit = closeableFileGroupRecordIterator.close()
     }
   }
+
+  private def getPartitionPath(split: HoodieMergeOnReadFileSplit): StoragePath 
= {
+    // Determine partition path as an immediate parent folder of either
+    //    - The base file
+    //    - Some log file
+    split.dataFile.map(baseFile =>
+        
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(baseFile))
+      .getOrElse(split.logFiles.head.getPath)
+      .getParent
+  }
 }
 
 object HoodieMergeOnReadRDDV2 {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
deleted file mode 100644
index a6e158f1bc10..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.HoodieBaseRelation.BaseFileReader
-import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
-import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
-import org.apache.hudi.LogFileIterator.{getPartitionPath, scanLog}
-import org.apache.hudi.avro.{AvroRecordContext, AvroSchemaCache}
-import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig, 
HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.engine.{EngineType, HoodieLocalEngineContext, 
RecordContext}
-import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
-import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, 
HoodieEmptyRecord, HoodieLogFile, HoodieOperation, HoodieRecord, 
HoodieSparkRecord}
-import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
-import org.apache.hudi.common.table.read.{BufferedRecord, BufferedRecords, 
DeleteContext}
-import org.apache.hudi.common.util.{FileIOUtils, HoodieRecordUtils}
-import org.apache.hudi.config.HoodiePayloadConfig
-import org.apache.hudi.hadoop.fs.HadoopFSUtils
-import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
-import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadata}
-import 
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
-import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
-import org.apache.hudi.util.CachingIterator
-
-import org.apache.avro.Schema
-import org.apache.avro.generic.{GenericRecord, IndexedRecord}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
-import org.apache.spark.sql.HoodieInternalRowUtils
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Projection
-import org.apache.spark.sql.types.StructType
-
-import java.io.Closeable
-
-import scala.annotation.tailrec
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
- * Provided w/ list of log files, iterates over all of the records stored in
- * Delta Log files (represented as [[InternalRow]]s)
- */
-class LogFileIterator(logFiles: List[HoodieLogFile],
-                      partitionPath: StoragePath,
-                      tableSchema: HoodieTableSchema,
-                      requiredStructTypeSchema: StructType,
-                      requiredAvroSchema: Schema,
-                      tableState: HoodieTableState,
-                      config: Configuration)
-  extends CachingIterator[InternalRow] with AvroDeserializerSupport {
-
-  def this(logFiles: List[HoodieLogFile],
-           partitionPath: StoragePath,
-           tableSchema: HoodieTableSchema,
-           requiredSchema: HoodieTableSchema,
-           tableState: HoodieTableState,
-           config: Configuration) {
-    this(logFiles, partitionPath, tableSchema, requiredSchema.structTypeSchema,
-      new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState, 
config)
-  }
-  def this(split: HoodieMergeOnReadFileSplit,
-           tableSchema: HoodieTableSchema,
-           requiredSchema: HoodieTableSchema,
-           tableState: HoodieTableState,
-           config: Configuration) {
-    this(split.logFiles, getPartitionPath(split), tableSchema, requiredSchema, 
tableState, config)
-  }
-  private val maxCompactionMemoryInBytes: Long = 
getMaxCompactionMemoryInBytes(new JobConf(config))
-
-  protected val payloadProps: TypedProperties = if 
(tableState.orderingFields.nonEmpty) {
-    HoodiePayloadConfig.newBuilder
-      .withPayloadOrderingFields(String.join(",", tableState.orderingFields: 
_*))
-      .build
-      .getProps
-  } else {
-    new TypedProperties()
-  }
-
-  protected override val avroSchema: Schema = requiredAvroSchema
-  protected override val structTypeSchema: StructType = 
requiredStructTypeSchema
-
-  protected val logFileReaderAvroSchema: Schema = AvroSchemaCache.intern(new 
Schema.Parser().parse(tableSchema.avroSchemaStr))
-  protected val logFileReaderStructType: StructType = 
tableSchema.structTypeSchema
-  protected val deleteContext: DeleteContext = new DeleteContext(payloadProps, 
logFileReaderAvroSchema).withReaderSchema(logFileReaderAvroSchema);
-
-  private val requiredSchemaAvroProjection: AvroProjection = 
AvroProjection.create(avroSchema)
-  private val requiredSchemaRowProjection: Projection = 
generateUnsafeProjection(logFileReaderStructType, structTypeSchema)
-
-  private val logRecords = {
-    val internalSchema = 
tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
-
-    scanLog(logFiles, partitionPath, logFileReaderAvroSchema, tableState,
-      maxCompactionMemoryInBytes, config, internalSchema)
-  }
-
-  private val (hasOperationField, operationFieldPos) = {
-    val operationField = 
logFileReaderAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD)
-    if (operationField != null) {
-      (true, operationField.pos())
-    } else {
-      (false, -1)
-    }
-  }
-
-  protected def isDeleteOperation(r: InternalRow): Boolean = if 
(hasOperationField) {
-    val operation = r.getString(operationFieldPos)
-    HoodieOperation.fromName(operation) == HoodieOperation.DELETE
-  } else {
-    false
-  }
-
-  protected def isDeleteOperation(r: GenericRecord): Boolean = if 
(hasOperationField) {
-    val operation = r.get(operationFieldPos).toString
-    HoodieOperation.fromName(operation) == HoodieOperation.DELETE
-  } else {
-    false
-  }
-
-  def logRecordsPairIterator(): Iterator[(String, HoodieRecord[_])] = {
-    logRecords.iterator
-  }
-
-  // NOTE: This have to stay lazy to make sure it's initialized only at the 
point where it's
-  //       going to be used, since we modify `logRecords` before that and 
therefore can't do it any earlier
-  private lazy val logRecordsIterator: Iterator[Option[HoodieRecord[_]]] =
-    logRecords.iterator.map {
-      case (_, record: HoodieSparkRecord) => Option(record)
-      case (_, _: HoodieEmptyRecord[_]) => Option.empty
-      case (_, record) =>
-        toScalaOption(record.toIndexedRecord(logFileReaderAvroSchema, 
payloadProps))
-
-    }
-
-  protected def removeLogRecord(key: String): Option[HoodieRecord[_]] = 
logRecords.remove(key)
-
-  protected def doHasNext: Boolean = hasNextInternal
-
-  // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
-  //       that recursion is unfolded into a loop to avoid stack overflows 
while
-  //       handling records
-  @tailrec private def hasNextInternal: Boolean = {
-    logRecordsIterator.hasNext && {
-      logRecordsIterator.next() match {
-        case Some(r: HoodieAvroIndexedRecord) =>
-          val data = r.getData.asInstanceOf[GenericRecord]
-          if (isDeleteOperation(data)) {
-            this.hasNextInternal
-          } else {
-            val projectedAvroRecord = requiredSchemaAvroProjection(data)
-            nextRecord = deserialize(projectedAvroRecord)
-            true
-          }
-        case Some(r: HoodieSparkRecord) =>
-          val data = r.getData
-          if (isDeleteOperation(data)) {
-            this.hasNextInternal
-          } else {
-            nextRecord = requiredSchemaRowProjection(data)
-            true
-          }
-        case None => this.hasNextInternal
-      }
-    }
-  }
-}
-
-/**
- * Provided w/ list of log files and base file iterator, provides an iterator 
over all of the records stored in
- * Base file as well as all of the Delta Log files simply returning 
concatenation of these streams, while not
- * performing any combination/merging of the records w/ the same primary keys 
(ie producing duplicates potentially)
- */
-class SkipMergeIterator(logFiles: List[HoodieLogFile],
-                        partitionPath: StoragePath,
-                        baseFileIterator: Iterator[InternalRow],
-                        readerSchema: StructType,
-                        dataSchema: HoodieTableSchema,
-                        requiredStructTypeSchema: StructType,
-                        requiredAvroSchema: Schema,
-                        tableState: HoodieTableState,
-                        config: Configuration)
-  extends LogFileIterator(logFiles, partitionPath, dataSchema, 
requiredStructTypeSchema, requiredAvroSchema, tableState, config) {
-
-  def this(split: HoodieMergeOnReadFileSplit, baseFileReader: BaseFileReader, 
dataSchema: HoodieTableSchema,
-           requiredSchema: HoodieTableSchema, tableState: HoodieTableState, 
config: Configuration) {
-    this(split.logFiles, getPartitionPath(split), 
baseFileReader(split.dataFile.get),
-      baseFileReader.schema, dataSchema, requiredSchema.structTypeSchema,
-      new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState, 
config)
-  }
-
-  private val requiredSchemaProjection = 
generateUnsafeProjection(readerSchema, structTypeSchema)
-
-  override def doHasNext: Boolean = {
-    if (baseFileIterator.hasNext) {
-      // No merge is required, simply load current row and project into 
required schema
-      nextRecord = requiredSchemaProjection(baseFileIterator.next())
-      true
-    } else {
-      super[LogFileIterator].doHasNext
-    }
-  }
-}
-
-/**
- * Provided w/ list of log files and base file iterator, provides an iterator 
over all of the records stored in
- * a) Base file and all of the b) Delta Log files combining records with the 
same primary key from both of these
- * streams
- */
-class RecordMergingFileIterator(logFiles: List[HoodieLogFile],
-                                partitionPath: StoragePath,
-                                baseFileIterator: Iterator[InternalRow],
-                                readerSchema: StructType,
-                                dataSchema: HoodieTableSchema,
-                                requiredStructTypeSchema: StructType,
-                                requiredAvroSchema: Schema,
-                                tableState: HoodieTableState,
-                                config: Configuration)
-  extends LogFileIterator(logFiles, partitionPath, dataSchema, 
requiredStructTypeSchema, requiredAvroSchema, tableState, config) {
-
-  def this(logFiles: List[HoodieLogFile],
-           partitionPath: StoragePath,
-           baseFileIterator: Iterator[InternalRow],
-           readerSchema: StructType,
-           dataSchema: HoodieTableSchema,
-           requiredSchema: HoodieTableSchema,
-           tableState: HoodieTableState,
-           config: Configuration) {
-    this(logFiles, partitionPath, baseFileIterator, readerSchema, dataSchema, 
requiredSchema.structTypeSchema,
-      new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState, 
config)
-  }
-  def this(split: HoodieMergeOnReadFileSplit, baseFileReader: BaseFileReader, 
dataSchema: HoodieTableSchema,
-           requiredSchema: HoodieTableSchema, tableState: HoodieTableState, 
config: Configuration) {
-    this(split.logFiles, getPartitionPath(split), 
baseFileReader(split.dataFile.get),
-      baseFileReader.schema, dataSchema, requiredSchema, tableState, config)
-  }
-
-  // NOTE: Record-merging iterator supports 2 modes of operation merging 
records bearing either
-  //        - Full table's schema
-  //        - Projected schema
-  //       As such, no particular schema could be assumed, and therefore we 
rely on the caller
-  //       to correspondingly set the schema of the expected output of 
base-file reader
-  private val baseFileReaderAvroSchema = 
AvroSchemaCache.intern(sparkAdapter.getAvroSchemaConverters.toAvroType(readerSchema,
 nullable = false, "record"))
-
-  private val serializer = sparkAdapter.createAvroSerializer(readerSchema, 
baseFileReaderAvroSchema, nullable = false)
-
-  private val recordKeyOrdinal = 
readerSchema.fieldIndex(tableState.recordKeyField)
-
-  private val requiredSchemaProjection = 
generateUnsafeProjection(readerSchema, structTypeSchema)
-  private val requiredSchemaAvroProjection = AvroProjection.create(avroSchema)
-
-  private val recordMerger = 
HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK,
-    tableState.recordMergeImplClasses.asJava, tableState.recordMergeStrategyId)
-
-  private val rowRecordContext: RecordContext[InternalRow] = 
SparkFileFormatInternalRecordContext.getFieldAccessorInstance
-  private val avroRecordContext: RecordContext[IndexedRecord] = 
AvroRecordContext.getFieldAccessorInstance
-  private val orderingFields: Array[String] = tableState.orderingFields.toArray
-
-  override def doHasNext: Boolean = hasNextInternal
-
-  // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
-  //       that recursion is unfolded into a loop to avoid stack overflows 
while
-  //       handling records
-  @tailrec private def hasNextInternal: Boolean = {
-    if (baseFileIterator.hasNext) {
-      val curRow = baseFileIterator.next()
-      val curKey = curRow.getString(recordKeyOrdinal)
-      val updatedRecordOpt = removeLogRecord(curKey)
-      if (updatedRecordOpt.isEmpty) {
-        // No merge is required, simply load current row and project into 
required schema
-        nextRecord = requiredSchemaProjection(curRow)
-        true
-      } else {
-       val mergedRecordOpt = merge(curRow, updatedRecordOpt.get)
-        if (mergedRecordOpt.isEmpty) {
-          // Record has been deleted, skipping
-          this.hasNextInternal
-        } else {
-          nextRecord = mergedRecordOpt.get
-          true
-        }
-      }
-    } else {
-      super[LogFileIterator].doHasNext
-    }
-  }
-
-  private def serialize(curRowRecord: InternalRow): GenericRecord =
-    serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
-
-  private def merge(curRow: InternalRow, newRecord: HoodieRecord[_]): 
Option[InternalRow] = {
-    // NOTE: We have to pass in Avro Schema used to read from Delta Log file 
since we invoke combining API
-    //       on the record from the Delta Log
-    recordMerger.getRecordType match {
-      case HoodieRecordType.SPARK =>
-        val curRecord = BufferedRecords.fromEngineRecord(curRow, 
baseFileReaderAvroSchema, rowRecordContext, orderingFields, 
newRecord.getRecordKey, false)
-        val newBufferedRecord = BufferedRecords.fromHoodieRecord(newRecord, 
logFileReaderAvroSchema, rowRecordContext, payloadProps, orderingFields, 
deleteContext)
-        val result = recordMerger.merge(curRecord, newBufferedRecord, 
rowRecordContext, payloadProps).asInstanceOf[BufferedRecord[InternalRow]]
-        if (result.isDelete) {
-          None
-        } else {
-          val schema = 
HoodieInternalRowUtils.getCachedSchema(rowRecordContext.getSchemaFromBufferRecord(result))
-          val projection = 
HoodieInternalRowUtils.getCachedUnsafeProjection(schema, structTypeSchema)
-          Some(projection.apply(result.getRecord))
-        }
-      case _ =>
-        val curRecord = BufferedRecords.fromEngineRecord(serialize(curRow), 
baseFileReaderAvroSchema, avroRecordContext, orderingFields, 
newRecord.getRecordKey, false)
-        val newBufferedRecord = BufferedRecords.fromHoodieRecord(newRecord, 
logFileReaderAvroSchema, avroRecordContext, payloadProps, orderingFields, 
deleteContext)
-        val result = recordMerger.merge(curRecord, newBufferedRecord, 
avroRecordContext, payloadProps).asInstanceOf[BufferedRecord[IndexedRecord]]
-        if (result.isDelete) {
-          None
-        } else {
-          val avroRecord = result.getRecord.asInstanceOf[GenericRecord]
-          Some(deserialize(requiredSchemaAvroProjection(avroRecord)))
-        }
-    }
-  }
-}
-
-object LogFileIterator extends SparkAdapterSupport {
-
-  def scanLog(logFiles: List[HoodieLogFile],
-              partitionPath: StoragePath,
-              logSchema: Schema,
-              tableState: HoodieTableState,
-              maxCompactionMemoryInBytes: Long,
-              hadoopConf: Configuration,
-              internalSchema: InternalSchema = 
InternalSchema.getEmptyInternalSchema): mutable.Map[String, HoodieRecord[_]] = {
-    val tablePath = tableState.tablePath
-    val storage = HoodieStorageUtils.getStorage(tablePath, 
HadoopFSUtils.getStorageConf(hadoopConf))
-
-    val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
-      .withStorage(storage)
-      .withBasePath(tablePath)
-      .withLogFilePaths(logFiles.map(logFile => 
logFile.getPath.toString).asJava)
-      .withReaderSchema(logSchema)
-      // NOTE: This part shall only be reached when at least one log is 
present in the file-group
-      //       entailing that table has to have at least one commit
-      .withLatestInstantTime(tableState.latestCommitTimestamp.get)
-      .withReverseReader(false)
-      .withInternalSchema(internalSchema)
-      .withBufferSize(
-        hadoopConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
-          HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
-      .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
-      .withSpillableMapBasePath(
-        hadoopConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key,
-          FileIOUtils.getDefaultSpillableMapBasePath))
-      .withDiskMapType(
-        hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
-          HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
-      .withBitCaskDiskMapCompressionEnabled(
-        
hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
-          
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-
-    if (logFiles.nonEmpty) {
-      logRecordScannerBuilder.withPartition(getRelativePartitionPath(
-        new StoragePath(tableState.tablePath), 
logFiles.head.getPath.getParent))
-    }
-
-    logRecordScannerBuilder.withRecordMerger(
-      HoodieRecordUtils.createRecordMerger(tableState.tablePath, 
EngineType.SPARK, tableState.recordMergeImplClasses.asJava, 
tableState.recordMergeStrategyId))
-
-    val scanner = logRecordScannerBuilder.build()
-
-    closing(scanner) {
-      // NOTE: We have to copy record-map (by default immutable copy is 
exposed)
-      mutable.HashMap(scanner.getRecords.asScala.toSeq: _*)
-    }
-  }
-
-  def closing[T](c: Closeable)(f: => T): T = {
-    try { f } finally {
-      c.close()
-    }
-  }
-
-  def getPartitionPath(split: HoodieMergeOnReadFileSplit): StoragePath = {
-    // Determine partition path as an immediate parent folder of either
-    //    - The base file
-    //    - Some log file
-    split.dataFile.map(baseFile =>
-      
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(baseFile))
-      .getOrElse(split.logFiles.head.getPath)
-      .getParent
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
index 10eb1cfd272e..522152558085 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
@@ -86,18 +86,20 @@ case class MergeOnReadIncrementalRelationV1(override val 
sqlContext: SQLContext,
     val optionalFilters = filters
     val readers = createBaseFileReaders(tableSchema, requiredSchema, 
requestedColumns, requiredFilters, optionalFilters)
 
-    new HoodieMergeOnReadRDDV1(
+    new HoodieMergeOnReadRDDV2(
       sqlContext.sparkContext,
       config = jobConf,
+      sqlConf = sqlContext.sparkSession.sessionState.conf,
       fileReaders = readers,
       tableSchema = tableSchema,
       requiredSchema = requiredSchema,
       tableState = tableState,
       mergeType = mergeType,
       fileSplits = fileSplits,
-      includeStartTime = includeStartTime,
-      startTimestamp = startTs,
-      endTimestamp = endTs)
+      includedInstantTimeSet = 
Option(includedCommits.map(_.requestedTime).toSet),
+      optionalFilters = optionalFilters,
+      metaClient = metaClient,
+      options = optParams)
   }
 
   override protected def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index 471f9a3f0a6e..39ef098b5f30 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.{AvroConversionUtils, 
HoodieTableSchema, SparkAdapterSupp
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
 import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.cdc.CDCFileGroupIterator.{CDC_OPERATION_DELETE, 
CDC_OPERATION_INSERT, CDC_OPERATION_UPDATE}
 import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig, 
HoodieMetadataConfig, HoodieReaderConfig, TypedProperties}
 import 
org.apache.hudi.common.config.HoodieCommonConfig.{DISK_MAP_BITCASK_COMPRESSION_ENABLED,
 SPILLABLE_DISK_MAP_TYPE}
 import org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH
@@ -48,6 +49,7 @@ import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.conf.Configuration
 import 
org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter
+import org.apache.spark.Partition
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.HoodieInternalRowUtils
 import org.apache.spark.sql.avro.HoodieAvroDeserializer
@@ -66,6 +68,12 @@ import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+/**
+ * The split that will be processed by spark task.
+ * The [[changes]] should be sorted first.
+ */
+case class HoodieCDCFileGroupSplit(changes: Array[HoodieCDCFileSplit])
+
 class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
                            metaClient: HoodieTableMetaClient,
                            conf: StorageConfiguration[Configuration],
@@ -351,7 +359,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
         // no real record is deleted, just ignore.
       } else {
         // there is a real record deleted.
-        recordToLoad.update(0, CDCRelation.CDC_OPERATION_DELETE)
+        recordToLoad.update(0, CDC_OPERATION_DELETE)
         recordToLoad.update(2, 
convertBufferedRecordToJsonString(existingRecordOpt.get))
         recordToLoad.update(3, null)
         loaded = true
@@ -360,7 +368,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
       val existingRecordOpt = beforeImageRecords.get(logRecord.getRecordKey)
       if (existingRecordOpt.isEmpty) {
         // a new record is inserted.
-        recordToLoad.update(0, CDCRelation.CDC_OPERATION_INSERT)
+        recordToLoad.update(0, CDC_OPERATION_INSERT)
         recordToLoad.update(2, null)
         recordToLoad.update(3, convertBufferedRecordToJsonString(logRecord))
         // insert into beforeImageRecords
@@ -371,7 +379,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
         val existingRecord = existingRecordOpt.get
         val mergeRecord = merge(existingRecord, logRecord)
         if (existingRecord != mergeRecord) {
-          recordToLoad.update(0, CDCRelation.CDC_OPERATION_UPDATE)
+          recordToLoad.update(0, CDC_OPERATION_UPDATE)
           recordToLoad.update(2, 
convertBufferedRecordToJsonString(existingRecord))
           recordToLoad.update(3, 
convertBufferedRecordToJsonString(mergeRecord))
           // update into beforeImageRecords
@@ -460,11 +468,11 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
     recordToLoad = currentCDCFileSplit.getCdcInferCase match {
       case BASE_FILE_INSERT =>
         InternalRow.fromSeq(Seq(
-          CDCRelation.CDC_OPERATION_INSERT, 
convertToUTF8String(currentInstant),
+          CDC_OPERATION_INSERT, convertToUTF8String(currentInstant),
           null, null))
       case BASE_FILE_DELETE =>
         InternalRow.fromSeq(Seq(
-          CDCRelation.CDC_OPERATION_DELETE, 
convertToUTF8String(currentInstant),
+          CDC_OPERATION_DELETE, convertToUTF8String(currentInstant),
           null, null))
       case LOG_FILE =>
         InternalRow.fromSeq(Seq(
@@ -476,7 +484,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
           null, null))
       case REPLACE_COMMIT =>
         InternalRow.fromSeq(Seq(
-          CDCRelation.CDC_OPERATION_DELETE, 
convertToUTF8String(currentInstant),
+          CDC_OPERATION_DELETE, convertToUTF8String(currentInstant),
           null, null))
     }
   }
@@ -594,3 +602,9 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
     }
   }
 }
+
+object CDCFileGroupIterator {
+  val CDC_OPERATION_DELETE: UTF8String = UTF8String.fromString(DELETE.getValue)
+  val CDC_OPERATION_INSERT: UTF8String = UTF8String.fromString(INSERT.getValue)
+  val CDC_OPERATION_UPDATE: UTF8String = UTF8String.fromString(UPDATE.getValue)
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
deleted file mode 100644
index bfcbbc031018..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cdc
-
-import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
HoodieDataSourceHelper, HoodieTableSchema}
-import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
-import org.apache.hudi.common.table.cdc.HoodieCDCExtractor
-import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
-import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils._
-import org.apache.hudi.common.table.log.InstantRange
-import org.apache.hudi.common.table.log.InstantRange.RangeType
-import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.internal.schema.InternalSchema
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SparkSession, SQLContext}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.unsafe.types.UTF8String
-
-import scala.collection.JavaConverters._
-import scala.util.{Failure, Success, Try}
-
-/**
- * Hoodie CDC Relation extends Spark's [[BaseRelation]], provide the schema of 
cdc
- * and the [[buildScan]] to return the change-data in a specified range.
- */
-class CDCRelation(
-    override val sqlContext: SQLContext,
-    metaClient: HoodieTableMetaClient,
-    startInstant: String,
-    endInstant: String,
-    options: Map[String, String],
-    rangeType: RangeType = InstantRange.RangeType.OPEN_CLOSED
-) extends BaseRelation with PrunedFilteredScan with Logging {
-
-  imbueConfigs(sqlContext)
-
-  val spark: SparkSession = sqlContext.sparkSession
-
-  val (tableAvroSchema, _) = {
-    val schemaUtil = new TableSchemaResolver(metaClient)
-    val avroSchema = Try(schemaUtil.getTableAvroSchema) match {
-      case Success(schema) => schema
-      case Failure(e) =>
-        throw new IllegalArgumentException("Failed to fetch schema from the 
table", e)
-    }
-    // try to find internalSchema
-    val internalSchemaFromMeta = try {
-      
schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
-    } catch {
-      case _: Exception => InternalSchema.getEmptyInternalSchema
-    }
-    (avroSchema, internalSchemaFromMeta)
-  }
-
-  val tableStructSchema: StructType = 
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
-
-  val cdcExtractor: HoodieCDCExtractor =
-    new HoodieCDCExtractor(
-      metaClient,
-      InstantRange.builder()
-        .startInstant(startInstant)
-        .endInstant(endInstant)
-        .nullableBoundary(true)
-        .rangeType(rangeType).build(),
-      false)
-
-  override final def needConversion: Boolean = false
-
-  override def schema: StructType = CDCRelation.FULL_CDC_SPARK_SCHEMA
-
-  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
-    val internalRows = buildScan0(requiredColumns, filters)
-    internalRows.asInstanceOf[RDD[Row]]
-  }
-
-  def buildScan0(requiredColumns: Array[String], filters: Array[Filter]): 
RDD[InternalRow] = {
-    val nameToField = schema.fields.map(f => f.name -> f).toMap
-    val requiredSchema = StructType(requiredColumns.map(nameToField))
-    val originTableSchema = HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString)
-    val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
-      sparkSession = spark,
-      dataSchema = tableStructSchema,
-      partitionSchema = StructType(Nil),
-      requiredSchema = tableStructSchema,
-      filters = Nil,
-      options = options,
-      hadoopConf = spark.sessionState.newHadoopConf()
-    )
-
-    val changes = cdcExtractor.extractCDCFileSplits().values().asScala.map { 
splits =>
-      HoodieCDCFileGroupSplit(
-        splits.asScala.sorted.toArray
-      )
-    }
-    val cdcRdd = new HoodieCDCRDD(
-      spark,
-      metaClient,
-      parquetReader,
-      originTableSchema,
-      schema,
-      requiredSchema,
-      changes.toArray
-    )
-    cdcRdd.asInstanceOf[RDD[InternalRow]]
-  }
-
-  def imbueConfigs(sqlContext: SQLContext): Unit = {
-    // Disable vectorized reading for CDC relation
-    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "false")
-  }
-}
-
-object CDCRelation {
-
-  val CDC_OPERATION_DELETE: UTF8String = UTF8String.fromString(DELETE.getValue)
-  val CDC_OPERATION_INSERT: UTF8String = UTF8String.fromString(INSERT.getValue)
-  val CDC_OPERATION_UPDATE: UTF8String = UTF8String.fromString(UPDATE.getValue)
-
-  /**
-   * CDC Schema For Spark.
-   * Also it's schema when `hoodie.table.cdc.supplemental.logging.mode` is 
[[DATA_BEFORE_AFTER]].
-   * Here we use the debezium format.
-   */
-  val FULL_CDC_SPARK_SCHEMA: StructType = {
-    StructType(
-      Seq(
-        StructField(CDC_OPERATION_TYPE, StringType),
-        StructField(CDC_COMMIT_TIMESTAMP, StringType),
-        StructField(CDC_BEFORE_IMAGE, StringType),
-        StructField(CDC_AFTER_IMAGE, StringType)
-      )
-    )
-  }
-
-  /**
-   * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is 
[[OP_KEY_ONLY]].
-   */
-  val MIN_CDC_SPARK_SCHEMA: StructType = {
-    StructType(
-      Seq(
-        StructField(CDC_OPERATION_TYPE, StringType),
-        StructField(CDC_RECORD_KEY, StringType)
-      )
-    )
-  }
-
-  /**
-   * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is 
[[DATA_BEFORE]].
-   */
-  val CDC_WITH_BEFORE_SPARK_SCHEMA: StructType = {
-    StructType(
-      Seq(
-        StructField(CDC_OPERATION_TYPE, StringType),
-        StructField(CDC_RECORD_KEY, StringType),
-        StructField(CDC_BEFORE_IMAGE, StringType)
-      )
-    )
-  }
-
-  def isCDCEnabled(metaClient: HoodieTableMetaClient): Boolean = {
-    metaClient.getTableConfig.isCDCEnabled
-  }
-
-  /**
-   * The only approach to create the CDC relation.
-   */
-  def getCDCRelation(
-      sqlContext: SQLContext,
-      metaClient: HoodieTableMetaClient,
-      options: Map[String, String],
-      rangeType: RangeType = RangeType.OPEN_CLOSED): CDCRelation = {
-
-    if (!isCDCEnabled(metaClient)) {
-      throw new IllegalArgumentException(s"It isn't a CDC hudi table on 
${metaClient.getBasePath}")
-    }
-
-    val startCompletionTime = 
options.getOrElse(DataSourceReadOptions.START_COMMIT.key(),
-      throw new HoodieException(s"CDC Query should provide the valid start 
completion time "
-        + s"through the option ${DataSourceReadOptions.START_COMMIT.key()}")
-    )
-    val endCompletionTime = 
options.getOrElse(DataSourceReadOptions.END_COMMIT.key(),
-      getTimestampOfLatestInstant(metaClient)
-    )
-
-    new CDCRelation(sqlContext, metaClient, startCompletionTime, 
endCompletionTime, options, rangeType)
-  }
-
-  def getTimestampOfLatestInstant(metaClient: HoodieTableMetaClient): String = 
{
-    val latestInstant = metaClient.getActiveTimeline.lastInstant()
-    if (latestInstant.isPresent) {
-      latestInstant.get().requestedTime
-    } else {
-      throw new HoodieException("No valid instant in Active Timeline.")
-    }
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCFileIndex.scala
similarity index 62%
rename from 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
rename to 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCFileIndex.scala
index 81eecf3272a5..bf8f0f52c2bd 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCFileIndex.scala
@@ -17,19 +17,22 @@
  * under the License.
  */
 
-package org.apache.hudi
+package org.apache.hudi.cdc
 
-import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex}
+import org.apache.hudi.cdc.HoodieCDCFileIndex.isCDCEnabled
 import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.table.cdc.HoodieCDCExtractor
+import org.apache.hudi.common.table.cdc.{HoodieCDCExtractor, HoodieCDCUtils}
+import org.apache.hudi.common.table.log.InstantRange
 import org.apache.hudi.common.table.log.InstantRange.RangeType
+import org.apache.hudi.exception.HoodieException
 
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
GenericInternalRow}
 import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, 
NoopCache, PartitionDirectory}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
 
 import scala.collection.JavaConverters._
 
@@ -44,8 +47,23 @@ class HoodieCDCFileIndex(override val spark: SparkSession,
     spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles, 
shouldEmbedFileSlices = true
   ) with FileIndex  {
   private val emptyPartitionPath: String = "empty_partition_path";
-  val cdcRelation: CDCRelation = CDCRelation.getCDCRelation(spark.sqlContext, 
metaClient, options, rangeType)
-  val cdcExtractor: HoodieCDCExtractor = cdcRelation.cdcExtractor
+
+  if (!isCDCEnabled(metaClient)) {
+    throw new IllegalArgumentException(s"It isn't a CDC hudi table on 
${metaClient.getBasePath}")
+  }
+
+  private val cdcExtractor: HoodieCDCExtractor = new 
HoodieCDCExtractor(metaClient,
+    InstantRange.builder()
+      .startInstant(options.getOrElse(DataSourceReadOptions.START_COMMIT.key(),
+        throw new HoodieException(s"CDC Query should provide the valid start 
completion time "
+          + s"through the option ${DataSourceReadOptions.START_COMMIT.key()}")
+      ))
+      .endInstant(options.getOrElse(DataSourceReadOptions.END_COMMIT.key(),
+        HoodieCDCFileIndex.getTimestampOfLatestInstant(metaClient)
+      ))
+      .nullableBoundary(true)
+      .rangeType(rangeType).build(),
+    false)
 
   override def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[PartitionDirectory] = {
     hasPushedDownPartitionPredicates = true
@@ -81,3 +99,35 @@ class HoodieCDCFileIndex(override val spark: SparkSession,
     }.toArray
   }
 }
+
+object HoodieCDCFileIndex {
+
+  /**
+   * CDC Schema For Spark.
+   * Also it's schema when `hoodie.table.cdc.supplemental.logging.mode` is 
[[DATA_BEFORE_AFTER]].
+   * Here we use the debezium format.
+   */
+  val FULL_CDC_SPARK_SCHEMA: StructType = {
+    StructType(
+      Seq(
+        StructField(HoodieCDCUtils.CDC_OPERATION_TYPE, StringType),
+        StructField(HoodieCDCUtils.CDC_COMMIT_TIMESTAMP, StringType),
+        StructField(HoodieCDCUtils.CDC_BEFORE_IMAGE, StringType),
+        StructField(HoodieCDCUtils.CDC_AFTER_IMAGE, StringType)
+      )
+    )
+  }
+
+  private def getTimestampOfLatestInstant(metaClient: HoodieTableMetaClient): 
String = {
+    val latestInstant = metaClient.getActiveTimeline.lastInstant()
+    if (latestInstant.isPresent) {
+      latestInstant.get().requestedTime
+    } else {
+      throw new HoodieException("No valid instant in Active Timeline.")
+    }
+  }
+
+  def isCDCEnabled(metaClient: HoodieTableMetaClient): Boolean = {
+    metaClient.getTableConfig.isCDCEnabled
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
deleted file mode 100644
index 97b572ed98b2..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ /dev/null
@@ -1,602 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.cdc
-
-import org.apache.hudi.{AvroConversionUtils, AvroProjection, HoodieFileIndex, 
HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, 
HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, 
SparkAdapterSupport}
-import org.apache.hudi.HoodieBaseRelation.BaseFileReader
-import org.apache.hudi.HoodieConversionUtils._
-import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
-import org.apache.hudi.avro.HoodieAvroUtils
-import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.model._
-import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils}
-import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase._
-import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
-import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
-import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator
-import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.config.HoodiePayloadConfig
-import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-import org.apache.hudi.storage.StoragePath
-import org.apache.hudi.util.JFunction
-
-import org.apache.avro.Schema
-import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
-import org.apache.hadoop.fs.Path
-import org.apache.spark.{Partition, SerializableWritable, TaskContext}
-import org.apache.spark.rdd.RDD
-import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.avro.HoodieAvroDeserializer
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Projection
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.unsafe.types.UTF8String
-
-import java.io.Closeable
-import java.util.Properties
-import java.util.stream.Collectors
-
-import scala.annotation.tailrec
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
- * The split that will be processed by spark task.
- * The [[changes]] should be sorted first.
- */
-case class HoodieCDCFileGroupSplit(changes: Array[HoodieCDCFileSplit])
-
-/**
- * The Spark [[Partition]]'s implementation.
- */
-case class HoodieCDCFileGroupPartition(
-    index: Int,
-    split: HoodieCDCFileGroupSplit
-) extends Partition
-
-class HoodieCDCRDD(
-    spark: SparkSession,
-    metaClient: HoodieTableMetaClient,
-    parquetReader: PartitionedFile => Iterator[InternalRow],
-    originTableSchema: HoodieTableSchema,
-    cdcSchema: StructType,
-    requiredCdcSchema: StructType,
-    @transient changes: Array[HoodieCDCFileGroupSplit])
-  extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD {
-
-  @transient
-  private val hadoopConf = spark.sparkContext.hadoopConfiguration
-
-  private val confBroadcast = spark.sparkContext.broadcast(new 
SerializableWritable(hadoopConf))
-
-  private val cdcSupplementalLoggingMode = 
metaClient.getTableConfig.cdcSupplementalLoggingMode
-
-  private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty, 
metaClient.getTableConfig)
-
-  protected val payloadProps: Properties = 
metaClient.getTableConfig.getOrderingFieldsStr
-    .map[TypedProperties](JFunction.toJavaFunction(preCombineFields =>
-      HoodiePayloadConfig.newBuilder
-        .withPayloadOrderingFields(preCombineFields)
-        .build
-        .getProps
-    )).orElse(new TypedProperties())
-
-  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
-    val cdcPartition = split.asInstanceOf[HoodieCDCFileGroupPartition]
-    new CDCFileGroupIterator(cdcPartition.split, metaClient)
-  }
-
-  override protected def getPartitions: Array[Partition] = {
-    changes.zipWithIndex.map{ case (split, index) =>
-      HoodieCDCFileGroupPartition(index, split)
-    }.toArray
-  }
-
-  private class CDCFileGroupIterator(
-      split: HoodieCDCFileGroupSplit,
-      metaClient: HoodieTableMetaClient
-    ) extends Iterator[InternalRow] with SparkAdapterSupport with 
AvroDeserializerSupport with Closeable {
-
-    private lazy val storage = metaClient.getStorage
-
-    private lazy val conf = confBroadcast.value.value
-
-    private lazy val basePath = metaClient.getBasePath
-
-    private lazy val tableConfig = metaClient.getTableConfig
-
-    private lazy val populateMetaFields = tableConfig.populateMetaFields()
-
-    private lazy val keyGenerator = {
-      HoodieSparkKeyGeneratorFactory.createKeyGenerator(tableConfig.getProps())
-    }
-
-    private lazy val recordKeyField: String = if (populateMetaFields) {
-      HoodieRecord.RECORD_KEY_METADATA_FIELD
-    } else {
-      val keyFields = metaClient.getTableConfig.getRecordKeyFields.get()
-      checkState(keyFields.length == 1)
-      keyFields.head
-    }
-
-    private lazy val orderingFields: List[String] = 
metaClient.getTableConfig.getOrderingFields.asScala.toList
-
-    private lazy val tableState = {
-      val metadataConfig = HoodieMetadataConfig.newBuilder()
-        .fromProperties(props)
-        .build()
-      HoodieTableState(
-        basePath.toUri.toString,
-        Some(split.changes.last.getInstant),
-        recordKeyField,
-        orderingFields,
-        usesVirtualKeys = !populateMetaFields,
-        metaClient.getTableConfig.getPayloadClass,
-        metadataConfig,
-        // TODO support CDC with spark record
-        recordMergeImplClasses = List(classOf[HoodieAvroRecordMerger].getName),
-        recordMergeStrategyId = 
HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID
-      )
-    }
-
-    protected override val avroSchema: Schema = new 
Schema.Parser().parse(originTableSchema.avroSchemaStr)
-
-    protected override val structTypeSchema: StructType = 
originTableSchema.structTypeSchema
-
-    private lazy val serializer = 
sparkAdapter.createAvroSerializer(originTableSchema.structTypeSchema,
-      avroSchema, nullable = false)
-
-    private lazy val avroProjection = AvroProjection.create(avroSchema)
-
-    private lazy val cdcAvroSchema: Schema = 
HoodieCDCUtils.schemaBySupplementalLoggingMode(
-      cdcSupplementalLoggingMode,
-      HoodieAvroUtils.removeMetadataFields(avroSchema)
-    )
-
-    private lazy val cdcSparkSchema: StructType = 
AvroConversionUtils.convertAvroSchemaToStructType(cdcAvroSchema)
-
-    private lazy val sparkPartitionedFileUtils = 
sparkAdapter.getSparkPartitionedFileUtils
-
-    /**
-     * The deserializer used to convert the CDC GenericRecord to Spark 
InternalRow.
-     */
-    private lazy val cdcRecordDeserializer: HoodieAvroDeserializer = {
-      sparkAdapter.createAvroDeserializer(cdcAvroSchema, cdcSparkSchema)
-    }
-
-    private lazy val projection: Projection = 
generateUnsafeProjection(cdcSchema, requiredCdcSchema)
-
-    // Iterator on cdc file
-    private val cdcFileIter = split.changes.iterator
-
-    // The instant that is currently being processed
-    private var currentInstant: String = _
-
-    // The change file that is currently being processed
-    private var currentCDCFileSplit: HoodieCDCFileSplit = _
-
-    /**
-     * Two cases will use this to iterator the records:
-     * 1) extract the change data from the base file directly, including 
'BASE_FILE_INSERT' and 'BASE_FILE_DELETE'.
-     * 2) when the type of cdc file is 'REPLACE_COMMIT',
-     *    use this to trace the records that are converted from the 
'[[beforeImageRecords]]
-     */
-    private var recordIter: Iterator[InternalRow] = Iterator.empty
-
-    /**
-     * Only one case where it will be used is that extract the change data 
from log files for mor table.
-     * At the time, 'logRecordIter' will work with [[beforeImageRecords]] that 
keep all the records of the previous file slice.
-     */
-    private var logRecordIter: Iterator[(String, HoodieRecord[_])] = 
Iterator.empty
-
-    /**
-     * Only one case where it will be used is that extract the change data 
from cdc log files.
-     */
-    private var cdcLogRecordIterator: HoodieCDCLogRecordIterator = _
-
-    /**
-     * The next record need to be returned when call next().
-     */
-    protected var recordToLoad: InternalRow = _
-
-    /**
-     * The list of files to which 'beforeImageRecords' belong.
-     * Use it to determine if 'beforeImageRecords' contains all the required 
data that extract
-     * the change data from the current cdc file.
-     */
-    private val beforeImageFiles: mutable.ArrayBuffer[String] = 
mutable.ArrayBuffer.empty
-
-    /**
-     * Keep the before-image data. There cases will use this:
-     * 1) the cdc infer case is [[LOG_FILE]];
-     * 2) the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] 
is 'op_key'.
-     */
-    private var beforeImageRecords: mutable.Map[String, GenericRecord] = 
mutable.Map.empty
-
-    /**
-     * Keep the after-image data. Only one case will use this:
-     * the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is 
[[OP_KEY_ONLY]] or [[DATA_BEFORE]].
-     */
-    private var afterImageRecords: mutable.Map[String, InternalRow] = 
mutable.Map.empty
-
-    private var internalRowToJsonStringConverter = new 
InternalRowToJsonStringConverter(originTableSchema.structTypeSchema)
-
-    private def needLoadNextFile: Boolean = {
-      !recordIter.hasNext &&
-        !logRecordIter.hasNext &&
-        (cdcLogRecordIterator == null || !cdcLogRecordIterator.hasNext)
-    }
-
-    @tailrec final def hasNextInternal: Boolean = {
-      if (needLoadNextFile) {
-        loadCdcFile()
-      }
-      if (currentCDCFileSplit == null) {
-        false
-      } else {
-        currentCDCFileSplit.getCdcInferCase match {
-          case BASE_FILE_INSERT | BASE_FILE_DELETE | REPLACE_COMMIT =>
-            if (recordIter.hasNext && loadNext()) {
-              true
-            } else {
-              hasNextInternal
-            }
-          case LOG_FILE =>
-            if (logRecordIter.hasNext && loadNext()) {
-              true
-            } else {
-              hasNextInternal
-            }
-          case AS_IS =>
-            if (cdcLogRecordIterator.hasNext && loadNext()) {
-              true
-            } else {
-              hasNextInternal
-            }
-        }
-      }
-    }
-
-    override def hasNext: Boolean = hasNextInternal
-
-    override final def next(): InternalRow = {
-      projection(recordToLoad)
-    }
-
-    def loadNext(): Boolean = {
-      var loaded = false
-      currentCDCFileSplit.getCdcInferCase match {
-        case BASE_FILE_INSERT =>
-          val originRecord = recordIter.next()
-          recordToLoad.update(3, convertRowToJsonString(originRecord))
-          loaded = true
-        case BASE_FILE_DELETE =>
-          val originRecord = recordIter.next()
-          recordToLoad.update(2, convertRowToJsonString(originRecord))
-          loaded = true
-        case LOG_FILE =>
-          loaded = loadNextLogRecord()
-        case AS_IS =>
-          val record = cdcLogRecordIterator.next().asInstanceOf[GenericRecord]
-          cdcSupplementalLoggingMode match {
-            case `DATA_BEFORE_AFTER` =>
-              recordToLoad.update(0, 
convertToUTF8String(String.valueOf(record.get(0))))
-              val before = record.get(2).asInstanceOf[GenericRecord]
-              recordToLoad.update(2, recordToJsonAsUTF8String(before))
-              val after = record.get(3).asInstanceOf[GenericRecord]
-              recordToLoad.update(3, recordToJsonAsUTF8String(after))
-            case `DATA_BEFORE` =>
-              val row = 
cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
-              val op = row.getString(0)
-              val recordKey = row.getString(1)
-              recordToLoad.update(0, convertToUTF8String(op))
-              val before = record.get(2).asInstanceOf[GenericRecord]
-              recordToLoad.update(2, recordToJsonAsUTF8String(before))
-              parse(op) match {
-                case INSERT =>
-                  recordToLoad.update(3, 
convertRowToJsonString(afterImageRecords(recordKey)))
-                case UPDATE =>
-                  recordToLoad.update(3, 
convertRowToJsonString(afterImageRecords(recordKey)))
-                case _ =>
-                  recordToLoad.update(3, null)
-              }
-            case _ =>
-              val row = 
cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
-              val op = row.getString(0)
-              val recordKey = row.getString(1)
-              recordToLoad.update(0, convertToUTF8String(op))
-              parse(op) match {
-                case INSERT =>
-                  recordToLoad.update(2, null)
-                  recordToLoad.update(3, 
convertRowToJsonString(afterImageRecords(recordKey)))
-                case UPDATE =>
-                  recordToLoad.update(2, 
recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
-                  recordToLoad.update(3, 
convertRowToJsonString(afterImageRecords(recordKey)))
-                case _ =>
-                  recordToLoad.update(2, 
recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
-                  recordToLoad.update(3, null)
-              }
-          }
-          loaded = true
-        case REPLACE_COMMIT =>
-          val originRecord = recordIter.next()
-          recordToLoad.update(2, convertRowToJsonString(originRecord))
-          loaded = true
-      }
-      loaded
-    }
-
-    /**
-     * Load the next log record, and judge how to convert it to cdc format.
-     */
-    private def loadNextLogRecord(): Boolean = {
-      var loaded = false
-      val (key, logRecord) = logRecordIter.next()
-      val indexedRecord = getInsertValue(logRecord)
-      if (indexedRecord.isEmpty) {
-        // it's a deleted record.
-        val existingRecordOpt = beforeImageRecords.remove(key)
-        if (existingRecordOpt.isEmpty) {
-          // no real record is deleted, just ignore.
-          logDebug(s"Cannot find any record corresponding to delete key [$key] 
in the log record.")
-        } else {
-          // there is a real record deleted.
-          recordToLoad.update(0, CDCRelation.CDC_OPERATION_DELETE)
-          recordToLoad.update(2, 
recordToJsonAsUTF8String(existingRecordOpt.get))
-          recordToLoad.update(3, null)
-          loaded = true
-        }
-      } else {
-        val existingRecordOpt = beforeImageRecords.get(key)
-        if (existingRecordOpt.isEmpty) {
-          // a new record is inserted.
-          val insertedRecord = 
avroProjection(indexedRecord.get.asInstanceOf[GenericRecord])
-          recordToLoad.update(0, CDCRelation.CDC_OPERATION_INSERT)
-          recordToLoad.update(2, null)
-          recordToLoad.update(3, recordToJsonAsUTF8String(insertedRecord))
-          // insert into beforeImageRecords
-          beforeImageRecords(key) = insertedRecord
-          loaded = true
-        } else {
-          // a existed record is updated.
-          val existingRecord = existingRecordOpt.get
-          val merged = merge(existingRecord, logRecord)
-          val mergeRecord = avroProjection(merged.asInstanceOf[GenericRecord])
-          if (existingRecord != mergeRecord) {
-            recordToLoad.update(0, CDCRelation.CDC_OPERATION_UPDATE)
-            recordToLoad.update(2, recordToJsonAsUTF8String(existingRecord))
-            recordToLoad.update(3, recordToJsonAsUTF8String(mergeRecord))
-            // update into beforeImageRecords
-            beforeImageRecords(key) = mergeRecord
-            loaded = true
-          }
-        }
-      }
-      loaded
-    }
-
-    private def loadCdcFile(): Unit = {
-      // reset all the iterator first.
-      recordIter = Iterator.empty
-      logRecordIter = Iterator.empty
-      beforeImageRecords.clear()
-      afterImageRecords.clear()
-      if (cdcLogRecordIterator != null) {
-        cdcLogRecordIterator.close()
-        cdcLogRecordIterator = null
-      }
-
-      if (cdcFileIter.hasNext) {
-        val split = cdcFileIter.next()
-        currentInstant = split.getInstant
-        currentCDCFileSplit = split
-        currentCDCFileSplit.getCdcInferCase match {
-          case BASE_FILE_INSERT =>
-            assert(currentCDCFileSplit.getCdcFiles != null && 
currentCDCFileSplit.getCdcFiles.size() == 1)
-            val absCDCPath = new StoragePath(basePath, 
currentCDCFileSplit.getCdcFiles.get(0))
-            val pathInfo = storage.getPathInfo(absCDCPath)
-
-            val pf = sparkPartitionedFileUtils.createPartitionedFile(
-              InternalRow.empty, absCDCPath, 0, pathInfo.getLength)
-            recordIter = parquetReader(pf)
-          case BASE_FILE_DELETE =>
-            assert(currentCDCFileSplit.getBeforeFileSlice.isPresent)
-            recordIter = 
loadFileSlice(currentCDCFileSplit.getBeforeFileSlice.get)
-          case LOG_FILE =>
-            assert(currentCDCFileSplit.getCdcFiles != null && 
currentCDCFileSplit.getCdcFiles.size() == 1
-              && currentCDCFileSplit.getBeforeFileSlice.isPresent)
-            
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
-            val absLogPath = new StoragePath(basePath, 
currentCDCFileSplit.getCdcFiles.get(0))
-            val morSplit = HoodieMergeOnReadFileSplit(None, List(new 
HoodieLogFile(storage.getPathInfo(absLogPath))))
-            val logFileIterator = new LogFileIterator(morSplit, 
originTableSchema, originTableSchema, tableState, conf)
-            logRecordIter = logFileIterator.logRecordsPairIterator
-          case AS_IS =>
-            assert(currentCDCFileSplit.getCdcFiles != null && 
!currentCDCFileSplit.getCdcFiles.isEmpty)
-            // load beforeFileSlice to beforeImageRecords
-            if (currentCDCFileSplit.getBeforeFileSlice.isPresent) {
-              
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
-            }
-            // load afterFileSlice to afterImageRecords
-            if (currentCDCFileSplit.getAfterFileSlice.isPresent) {
-              val iter = 
loadFileSlice(currentCDCFileSplit.getAfterFileSlice.get())
-              afterImageRecords = mutable.Map.empty
-              iter.foreach { row =>
-                val key = getRecordKey(row)
-                afterImageRecords.put(key, row.copy())
-              }
-            }
-
-            val cdcLogFiles = currentCDCFileSplit.getCdcFiles.asScala.map { 
cdcFile =>
-              new HoodieLogFile(storage.getPathInfo(new StoragePath(basePath, 
cdcFile)))
-            }.toArray
-            cdcLogRecordIterator = new HoodieCDCLogRecordIterator(storage, 
cdcLogFiles, cdcAvroSchema)
-          case REPLACE_COMMIT =>
-            if (currentCDCFileSplit.getBeforeFileSlice.isPresent) {
-              
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
-            }
-            recordIter = beforeImageRecords.values.map { record =>
-              deserialize(record)
-            }.iterator
-            beforeImageRecords.clear()
-        }
-        resetRecordFormat()
-      } else {
-        currentInstant = null
-        currentCDCFileSplit = null
-      }
-    }
-
-    /**
-     * Initialize the partial fields of the data to be returned in advance to 
speed up.
-     */
-    private def resetRecordFormat(): Unit = {
-      recordToLoad = currentCDCFileSplit.getCdcInferCase match {
-        case BASE_FILE_INSERT =>
-          InternalRow.fromSeq(Seq(
-            CDCRelation.CDC_OPERATION_INSERT, 
convertToUTF8String(currentInstant),
-            null, null))
-        case BASE_FILE_DELETE =>
-          InternalRow.fromSeq(Seq(
-            CDCRelation.CDC_OPERATION_DELETE, 
convertToUTF8String(currentInstant),
-            null, null))
-        case LOG_FILE =>
-          InternalRow.fromSeq(Seq(
-            null, convertToUTF8String(currentInstant),
-            null, null))
-        case AS_IS =>
-          InternalRow.fromSeq(Seq(
-            null, convertToUTF8String(currentInstant),
-            null, null))
-        case REPLACE_COMMIT =>
-          InternalRow.fromSeq(Seq(
-            CDCRelation.CDC_OPERATION_DELETE, 
convertToUTF8String(currentInstant),
-            null, null))
-      }
-    }
-
-    /**
-     * If [[beforeImageFiles]] are the list of file that we want to load 
exactly, use this directly.
-     * Otherwise we need to re-load what we need.
-     */
-    private def loadBeforeFileSliceIfNeeded(fileSlice: FileSlice): Unit = {
-      val files = List(fileSlice.getBaseFile.get().getPath) ++
-        fileSlice.getLogFiles.collect(Collectors.toList[HoodieLogFile]).asScala
-          .map(f => f.getPath.toUri.toString).toList
-      val same = files.sorted == beforeImageFiles.sorted.toList
-      if (!same) {
-        // clear up the beforeImageRecords
-        beforeImageRecords.clear()
-        val iter = loadFileSlice(fileSlice)
-        iter.foreach { row =>
-          val key = getRecordKey(row)
-          // Due to the reuse buffer mechanism of Spark serialization,
-          // we have to copy the serialized result if we need to retain its 
reference
-          beforeImageRecords.put(key, serialize(row, copy = true))
-        }
-        // reset beforeImageFiles
-        beforeImageFiles.clear()
-        beforeImageFiles.append(files: _*)
-      }
-    }
-
-    private def loadFileSlice(fileSlice: FileSlice): Iterator[InternalRow] = {
-      val baseFileInfo = 
storage.getPathInfo(fileSlice.getBaseFile.get().getStoragePath)
-      val basePartitionedFile = 
sparkPartitionedFileUtils.createPartitionedFile(
-        InternalRow.empty,
-        baseFileInfo.getPath,
-        0,
-        baseFileInfo.getLength
-      )
-      val logFiles = fileSlice.getLogFiles
-        .sorted(HoodieLogFile.getLogFileComparator)
-        .collect(Collectors.toList[HoodieLogFile])
-        .asScala.toList
-        .filterNot(_.getFileName.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
-
-      if (logFiles.isEmpty) {
-        // no log files, just load the base parquet file
-        parquetReader(basePartitionedFile)
-      } else {
-        // use [[RecordMergingFileIterator]] to load both the base file and 
log files
-        val morSplit = HoodieMergeOnReadFileSplit(Some(basePartitionedFile), 
logFiles)
-        new RecordMergingFileIterator(
-          morSplit,
-          BaseFileReader(parquetReader, originTableSchema.structTypeSchema),
-          originTableSchema,
-          originTableSchema,
-          tableState,
-          conf)
-      }
-    }
-
-    /**
-     * Convert InternalRow to json string.
-     */
-    private def convertRowToJsonString(record: InternalRow): UTF8String = {
-      internalRowToJsonStringConverter.convert(record)
-    }
-
-    /**
-     * The data of string type is stored in InternalRow using UTF8String type.
-     */
-    private def convertToUTF8String(str: String): UTF8String = {
-      UTF8String.fromString(str)
-    }
-
-    private def pathToString(p: Path): String = {
-      p.toUri.toString
-    }
-
-    private def serialize(curRowRecord: InternalRow, copy: Boolean = false): 
GenericRecord = {
-      val record = 
serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
-      if (copy) {
-        GenericData.get().deepCopy(record.getSchema, record)
-      } else {
-        record
-      }
-    }
-
-    private def recordToJsonAsUTF8String(record: GenericRecord): UTF8String = {
-      convertToUTF8String(HoodieCDCUtils.recordToJson(record))
-    }
-
-    private def getRecordKey(row: InternalRow): String = {
-      if (populateMetaFields) {
-        
row.getString(structTypeSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD))
-      } else {
-        this.keyGenerator.getKey(serialize(row)).getRecordKey
-      }
-    }
-
-    private def getInsertValue(
-        record: HoodieRecord[_])
-    : Option[IndexedRecord] = {
-      toScalaOption(record.toIndexedRecord(avroSchema, 
payloadProps)).map(_.getData)
-    }
-
-    private def merge(curAvroRecord: GenericRecord, newRecord: 
HoodieRecord[_]): IndexedRecord = {
-      
newRecord.getData.asInstanceOf[HoodieRecordPayload[_]].combineAndGetUpdateValue(curAvroRecord,
 avroSchema, payloadProps).get()
-    }
-
-    override def close(): Unit = {}
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
index 6feadb70333e..ae6740b3017a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
@@ -18,11 +18,12 @@
 
 package org.apache.spark.sql
 
-import org.apache.hudi.{HoodieCDCFileIndex, SparkAdapterSupport, 
SparkHoodieTableFileIndex}
+import org.apache.hudi.{SparkAdapterSupport, SparkHoodieTableFileIndex}
+import org.apache.hudi.cdc.HoodieCDCFileIndex
 
 import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Contains, 
EndsWith, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, 
In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, NamedExpression, 
Not, Or, StartsWith}
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
 import org.apache.spark.sql.types.{BooleanType, StructType}
 
@@ -34,7 +35,7 @@ object FileFormatUtilsForFileGroupReader extends 
SparkAdapterSupport {
     val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait]
     ff.isProjected = true
     val tableSchema = fs.location match {
-      case index: HoodieCDCFileIndex => index.cdcRelation.schema
+      case _: HoodieCDCFileIndex => HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA
       case index: SparkHoodieTableFileIndex => index.schema
     }
     val resolvedSchema = logicalRelation.resolve(tableSchema, 
fs.sparkSession.sessionState.analyzer.resolver)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
index 1afc7d84bff1..718dc827df53 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
HoodieSparkUtils, HoodieTableSchema, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
 import org.apache.hudi.avro.AvroSchemaUtils
-import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, 
HoodieCDCFileGroupSplit}
+import org.apache.hudi.cdc.{CDCFileGroupIterator, HoodieCDCFileGroupSplit, 
HoodieCDCFileIndex}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.config.{HoodieMemoryConfig, TypedProperties}
@@ -327,7 +327,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     val fileSplits = 
hoodiePartitionCDCFileGroupSliceMapping.getFileSplits().toArray
     val cdcFileGroupSplit: HoodieCDCFileGroupSplit = 
HoodieCDCFileGroupSplit(fileSplits)
     props.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, tableName)
-    val cdcSchema = CDCRelation.FULL_CDC_SPARK_SCHEMA
+    val cdcSchema = HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA
     new CDCFileGroupIterator(
       cdcFileGroupSplit,
       metaClient,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
index 87d3e334e8be..60fa89993489 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala
@@ -21,11 +21,10 @@ package org.apache.spark.sql.hudi.streaming
 
 import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
HoodieCopyOnWriteCDCHadoopFsRelationFactory, 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV1, 
HoodieMergeOnReadCDCHadoopFsRelationFactory, 
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV1, HoodieSparkUtils, 
IncrementalRelationV1, MergeOnReadIncrementalRelationV1, SparkAdapterSupport}
 import 
org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT
-import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.cdc.HoodieCDCFileIndex
 import org.apache.hudi.common.config.HoodieReaderConfig
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
HoodieTableVersion, TableSchemaResolver}
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils
 import org.apache.hudi.common.table.checkpoint.{CheckpointUtils, 
StreamerCheckpointV1}
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.{handleHollowCommitIfNeeded,
 HollowCommitHandling}
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling._
@@ -64,7 +63,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
 
   private lazy val isBootstrappedTable = 
metaClient.getTableConfig.getBootstrapBasePath.isPresent
 
-  private val isCDCQuery = CDCRelation.isCDCEnabled(metaClient) &&
+  private val isCDCQuery = HoodieCDCFileIndex.isCDCEnabled(metaClient) &&
     
parameters.get(DataSourceReadOptions.QUERY_TYPE.key).contains(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
 &&
     
parameters.get(DataSourceReadOptions.INCREMENTAL_FORMAT.key).contains(DataSourceReadOptions.INCREMENTAL_FORMAT_CDC_VAL)
 
@@ -100,7 +99,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
 
   override def schema: StructType = {
     if (isCDCQuery) {
-      CDCRelation.FULL_CDC_SPARK_SCHEMA
+      HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA
     } else {
       schemaOption.getOrElse {
         val schemaUtil = new TableSchemaResolver(metaClient)
@@ -154,20 +153,14 @@ class HoodieStreamSourceV1(sqlContext: SQLContext,
           DataSourceReadOptions.START_COMMIT.key()-> 
startCommitTime(startOffset),
           DataSourceReadOptions.END_COMMIT.key() -> endOffset.offsetCommitTime
         )
-        if (enableFileGroupReader) {
-          val relation = if (tableType == HoodieTableType.COPY_ON_WRITE) {
-            new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
-              sqlContext, metaClient, parameters ++ cdcOptions, schemaOption, 
isBootstrappedTable).build()
-          } else {
-            new HoodieMergeOnReadCDCHadoopFsRelationFactory(
-              sqlContext, metaClient, parameters ++ cdcOptions, schemaOption, 
isBootstrappedTable).build()
-          }
-          sparkAdapter.createStreamingDataFrame(sqlContext, relation, 
CDCRelation.FULL_CDC_SPARK_SCHEMA)
+        val relation = if (tableType == HoodieTableType.COPY_ON_WRITE) {
+          new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
+            sqlContext, metaClient, parameters ++ cdcOptions, schemaOption, 
isBootstrappedTable).build()
         } else {
-          val rdd = CDCRelation.getCDCRelation(sqlContext, metaClient, 
cdcOptions)
-            .buildScan0(HoodieCDCUtils.CDC_COLUMNS, Array.empty)
-
-          sparkAdapter.internalCreateDataFrame(sqlContext.sparkSession, rdd, 
CDCRelation.FULL_CDC_SPARK_SCHEMA, isStreaming = true)}
+          new HoodieMergeOnReadCDCHadoopFsRelationFactory(
+            sqlContext, metaClient, parameters ++ cdcOptions, schemaOption, 
isBootstrappedTable).build()
+        }
+        sparkAdapter.createStreamingDataFrame(sqlContext, relation, 
HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA)
       } else {
         // Consume the data between (startCommitTime, endCommitTime]
         val incParams = parameters ++ Map(
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
index 3eebc4639179..c41657d74120 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
@@ -18,11 +18,10 @@
 package org.apache.spark.sql.hudi.streaming
 
 import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
HoodieCopyOnWriteCDCHadoopFsRelationFactory, 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2, 
HoodieMergeOnReadCDCHadoopFsRelationFactory, 
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2, HoodieSparkUtils, 
IncrementalRelationV2, MergeOnReadIncrementalRelationV2, SparkAdapterSupport}
-import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.cdc.HoodieCDCFileIndex
 import org.apache.hudi.common.config.HoodieReaderConfig
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
HoodieTableVersion, TableSchemaResolver}
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils
 import org.apache.hudi.common.table.checkpoint.{CheckpointUtils, 
StreamerCheckpointV2}
 import org.apache.hudi.common.table.log.InstantRange.RangeType
 import org.apache.hudi.util.SparkConfigUtils
@@ -59,7 +58,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
   private lazy val enableFileGroupReader = SparkConfigUtils
     .getStringWithAltKeys(parameters, 
HoodieReaderConfig.FILE_GROUP_READER_ENABLED).toBoolean
 
-  private val isCDCQuery = CDCRelation.isCDCEnabled(metaClient) &&
+  private val isCDCQuery = HoodieCDCFileIndex.isCDCEnabled(metaClient) &&
     
parameters.get(DataSourceReadOptions.QUERY_TYPE.key).contains(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
 &&
     
parameters.get(DataSourceReadOptions.INCREMENTAL_FORMAT.key).contains(DataSourceReadOptions.INCREMENTAL_FORMAT_CDC_VAL)
 
@@ -82,7 +81,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
 
   override def schema: StructType = {
     if (isCDCQuery) {
-      CDCRelation.FULL_CDC_SPARK_SCHEMA
+      HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA
     } else {
       schemaOption.getOrElse {
         val schemaUtil = new TableSchemaResolver(metaClient)
@@ -130,20 +129,14 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
           DataSourceReadOptions.START_COMMIT.key() -> startCompletionTime,
           DataSourceReadOptions.END_COMMIT.key() -> endOffset.offsetCommitTime
         )
-        if (enableFileGroupReader) {
-          val relation = if (tableType == HoodieTableType.COPY_ON_WRITE) {
-            new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
-              sqlContext, metaClient, parameters ++ cdcOptions, schemaOption, 
isBootstrappedTable, rangeType).build()
-          } else {
-            new HoodieMergeOnReadCDCHadoopFsRelationFactory(
-              sqlContext, metaClient, parameters ++ cdcOptions, schemaOption, 
isBootstrappedTable, rangeType).build()
-          }
-          sparkAdapter.createStreamingDataFrame(sqlContext, relation, 
CDCRelation.FULL_CDC_SPARK_SCHEMA)
+        val relation = if (tableType == HoodieTableType.COPY_ON_WRITE) {
+          new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
+            sqlContext, metaClient, parameters ++ cdcOptions, schemaOption, 
isBootstrappedTable, rangeType).build()
         } else {
-          val rdd = CDCRelation.getCDCRelation(sqlContext, metaClient, 
cdcOptions, rangeType)
-            .buildScan0(HoodieCDCUtils.CDC_COLUMNS, Array.empty)
-
-          sparkAdapter.internalCreateDataFrame(sqlContext.sparkSession, rdd, 
CDCRelation.FULL_CDC_SPARK_SCHEMA, isStreaming = true)}
+          new HoodieMergeOnReadCDCHadoopFsRelationFactory(
+            sqlContext, metaClient, parameters ++ cdcOptions, schemaOption, 
isBootstrappedTable, rangeType).build()
+        }
+        sparkAdapter.createStreamingDataFrame(sqlContext, relation, 
HoodieCDCFileIndex.FULL_CDC_SPARK_SCHEMA)
       } else {
         // Consume the data between (startCommitTime, endCommitTime]
         val incParams = parameters ++ Map(

Reply via email to