This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d745057ea [HUDI-4420] Fixing table schema delineation on 
partition/data schema for Spark relations  (#5708)
2d745057ea is described below

commit 2d745057eae3f6644339ceaba2dfcb6450871442
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Sat Jul 23 14:59:16 2022 -0700

    [HUDI-4420] Fixing table schema delineation on partition/data schema for 
Spark relations  (#5708)
---
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   2 +-
 .../org/apache/hudi/BaseFileOnlyRelation.scala     |  23 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 162 ++++++++------
 .../org/apache/hudi/HoodieDataSourceHelper.scala   |  18 +-
 .../scala/org/apache/hudi/HoodieFileScanRDD.scala  |   4 +-
 .../org/apache/hudi/HoodieMergeOnReadRDD.scala     | 243 +++++++++++----------
 .../hudi/MergeOnReadIncrementalRelation.scala      |  42 +---
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  | 120 +++++++---
 .../apache/spark/sql/hudi/DataSkippingUtils.scala  |   2 +-
 .../procedures/RunClusteringProcedure.scala        |   2 +-
 .../org/apache/hudi/TestDataSkippingUtils.scala    |   2 +-
 .../apache/spark/sql/hudi/TestInsertTable.scala    |   3 +-
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |   2 +-
 .../apache/spark/sql/adapter/Spark3_1Adapter.scala |   6 +-
 .../apache/spark/sql/adapter/Spark3_2Adapter.scala |   6 +-
 15 files changed, 364 insertions(+), 273 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index cd01e4fd5a..cd30528798 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -45,7 +45,7 @@ trait SparkAdapter extends Serializable {
    * Creates instance of [[HoodieCatalystExpressionUtils]] providing for 
common utils operating
    * on Catalyst [[Expression]]s
    */
-  def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils
+  def getCatalystExpressionUtils: HoodieCatalystExpressionUtils
 
   /**
    * Creates instance of [[HoodieCatalystPlansUtils]] providing for common 
utils operating
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index b7033c3bfc..416e91800f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -20,11 +20,13 @@ package org.apache.hudi
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieBaseRelation.projectReader
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.hadoop.HoodieROTablePathFilter
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.datasources
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructType
@@ -68,17 +70,18 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
   }
 
   protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
-                                    partitionSchema: StructType,
-                                    dataSchema: HoodieTableSchema,
+                                    tableSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
                                     requestedColumns: Array[String],
-                                    filters: Array[Filter]): HoodieUnsafeRDD = 
{
+                                    filters: Array[Filter]): RDD[InternalRow] 
= {
+    val (partitionSchema, dataSchema, requiredDataSchema) =
+      tryPrunePartitionColumns(tableSchema, requiredSchema)
 
     val baseFileReader = createBaseFileReader(
       spark = sparkSession,
       partitionSchema = partitionSchema,
       dataSchema = dataSchema,
-      requiredSchema = requiredSchema,
+      requiredDataSchema = requiredDataSchema,
       filters = filters,
       options = optParams,
       // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
@@ -86,7 +89,15 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
       hadoopConf = embedInternalSchema(new Configuration(conf), 
requiredSchema.internalSchema)
     )
 
-    new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits)
+    // NOTE: In some case schema of the reader's output (reader's schema) 
might not match the schema expected by the caller.
+    //       This could occur for ex, when requested schema contains partition 
columns which might not be persisted w/in the
+    //       data file, but instead would be parsed from the partition path. 
In that case output of the file-reader will have
+    //       different ordering of the fields than the original required 
schema (for more details please check out
+    //       [[ParquetFileFormat]] impl). In that case we have to project the 
rows from the file-reader's schema
+    //       back into the one expected by the caller
+    val projectedReader = projectReader(baseFileReader, 
requiredSchema.structTypeSchema)
+
+    new HoodieFileScanRDD(sparkSession, projectedReader.apply, fileSplits)
   }
 
   protected def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index ff6515db32..5274f257e1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -45,7 +45,7 @@ import 
org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, 
SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
SubqueryExpression, UnsafeProjection}
 import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, 
ParquetFileFormat}
@@ -274,7 +274,7 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
   def canPruneRelationSchema: Boolean =
     (fileFormat.isInstanceOf[ParquetFileFormat] || 
fileFormat.isInstanceOf[OrcFileFormat]) &&
       // NOTE: Some relations might be disabling sophisticated schema pruning 
techniques (for ex, nested schema pruning)
-      // TODO(HUDI-XXX) internal schema doesn't supported nested schema 
pruning currently
+      // TODO(HUDI-XXX) internal schema doesn't support nested schema pruning 
currently
       !hasSchemaOnRead
 
   override def schema: StructType = {
@@ -334,38 +334,14 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
     val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, 
internalSchemaOpt)
     val requiredSchema = HoodieTableSchema(requiredStructSchema, 
requiredAvroSchema.toString, Some(requiredInternalSchema))
 
-    // Since schema requested by the caller might contain partition columns, 
we might need to
-    // prune it, removing all partition columns from it in case these columns 
are not persisted
-    // in the data files
-    //
-    // NOTE: This partition schema is only relevant to file reader to be able 
to embed
-    //       values of partition columns (hereafter referred to as partition 
values) encoded into
-    //       the partition path, and omitted from the data file, back into 
fetched rows;
-    //       Note that, by default, partition columns are not omitted 
therefore specifying
-    //       partition schema for reader is not required
-    val (partitionSchema, dataSchema, requiredDataSchema) =
-      tryPrunePartitionColumns(tableSchema, requiredSchema)
-
     if (fileSplits.isEmpty) {
       sparkSession.sparkContext.emptyRDD
     } else {
-      val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, 
requiredDataSchema, targetColumns, filters)
-
-      // NOTE: In case when partition columns have been pruned from the 
required schema, we have to project
-      //       the rows from the pruned schema back into the one expected by 
the caller
-      val projectedRDD = if (requiredDataSchema.structTypeSchema != 
requiredSchema.structTypeSchema) {
-        rdd.mapPartitions { it =>
-          val fullPrunedSchema = 
StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields)
-          val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, 
requiredSchema.structTypeSchema)
-          it.map(unsafeProjection)
-        }
-      } else {
-        rdd
-      }
+      val rdd = composeRDD(fileSplits, tableSchema, requiredSchema, 
targetColumns, filters)
 
       // Here we rely on a type erasure, to workaround inherited API 
restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
       // Please check [[needConversion]] scala-doc for more details
-      projectedRDD.asInstanceOf[RDD[Row]]
+      rdd.asInstanceOf[RDD[Row]]
     }
   }
 
@@ -373,19 +349,17 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
    * Composes RDD provided file splits to read from, table and partition 
schemas, data filters to be applied
    *
    * @param fileSplits       file splits to be handled by the RDD
-   * @param partitionSchema  target table's partition schema
-   * @param dataSchema       target table's data files' schema
+   * @param tableSchema      target table's schema
    * @param requiredSchema   projected schema required by the reader
    * @param requestedColumns columns requested by the query
    * @param filters          data filters to be applied
-   * @return instance of RDD (implementing [[HoodieUnsafeRDD]])
+   * @return instance of RDD (holding [[InternalRow]]s)
    */
   protected def composeRDD(fileSplits: Seq[FileSplit],
-                           partitionSchema: StructType,
-                           dataSchema: HoodieTableSchema,
+                           tableSchema: HoodieTableSchema,
                            requiredSchema: HoodieTableSchema,
                            requestedColumns: Array[String],
-                           filters: Array[Filter]): HoodieUnsafeRDD
+                           filters: Array[Filter]): RDD[InternalRow]
 
   /**
    * Provided with partition and date filters collects target file splits to 
read records from, while
@@ -553,7 +527,7 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
   protected def createBaseFileReader(spark: SparkSession,
                                      partitionSchema: StructType,
                                      dataSchema: HoodieTableSchema,
-                                     requiredSchema: HoodieTableSchema,
+                                     requiredDataSchema: HoodieTableSchema,
                                      filters: Seq[Filter],
                                      options: Map[String, String],
                                      hadoopConf: Configuration): 
BaseFileReader = {
@@ -564,42 +538,56 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
     //       we have to eagerly initialize all of the readers even though only 
one specific to the type
     //       of the file being read will be used. This is required to avoid 
serialization of the whole
     //       relation (containing file-index for ex) and passing it to the 
executor
-    val reader = tableBaseFileFormat match {
-      case HoodieFileFormat.PARQUET =>
-        HoodieDataSourceHelper.buildHoodieParquetReader(
-          sparkSession = spark,
-          dataSchema = dataSchema.structTypeSchema,
-          partitionSchema = partitionSchema,
-          requiredSchema = requiredSchema.structTypeSchema,
-          filters = filters,
-          options = options,
-          hadoopConf = hadoopConf,
-          // We're delegating to Spark to append partition values to every row 
only in cases
-          // when these corresponding partition-values are not persisted w/in 
the data file itself
-          appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
-        )
+    val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType) 
=
+      tableBaseFileFormat match {
+        case HoodieFileFormat.PARQUET =>
+          val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
+            sparkSession = spark,
+            dataSchema = dataSchema.structTypeSchema,
+            partitionSchema = partitionSchema,
+            requiredSchema = requiredDataSchema.structTypeSchema,
+            filters = filters,
+            options = options,
+            hadoopConf = hadoopConf,
+            // We're delegating to Spark to append partition values to every 
row only in cases
+            // when these corresponding partition-values are not persisted 
w/in the data file itself
+            appendPartitionValues = 
shouldExtractPartitionValuesFromPartitionPath
+          )
+          // Since partition values by default are omitted, and not persisted 
w/in data-files by Spark,
+          // data-file readers (such as [[ParquetFileFormat]]) have to inject 
partition values while reading
+          // the data. As such, actual full schema produced by such reader is 
composed of
+          //    a) Data-file schema (projected or not)
+          //    b) Appended partition column values
+          val readerSchema = 
StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields)
+
+          (parquetReader, readerSchema)
 
       case HoodieFileFormat.HFILE =>
-        createHFileReader(
+        val hfileReader = createHFileReader(
           spark = spark,
           dataSchema = dataSchema,
-          requiredSchema = requiredSchema,
+          requiredDataSchema = requiredDataSchema,
           filters = filters,
           options = options,
           hadoopConf = hadoopConf
         )
 
+        (hfileReader, requiredDataSchema.structTypeSchema)
+
       case _ => throw new UnsupportedOperationException(s"Base file format is 
not currently supported ($tableBaseFileFormat)")
     }
 
-    partitionedFile => {
-      val extension = FSUtils.getFileExtension(partitionedFile.filePath)
-      if (tableBaseFileFormat.getFileExtension.equals(extension)) {
-        reader.apply(partitionedFile)
-      } else {
-        throw new UnsupportedOperationException(s"Invalid base-file format 
($extension), expected ($tableBaseFileFormat)")
-      }
-    }
+    BaseFileReader(
+      read = partitionedFile => {
+        val extension = FSUtils.getFileExtension(partitionedFile.filePath)
+        if (tableBaseFileFormat.getFileExtension.equals(extension)) {
+          read(partitionedFile)
+        } else {
+          throw new UnsupportedOperationException(s"Invalid base-file format 
($extension), expected ($tableBaseFileFormat)")
+        }
+      },
+      schema = schema
+    )
   }
 
   protected def embedInternalSchema(conf: Configuration, internalSchemaOpt: 
Option[InternalSchema]): Configuration = {
@@ -615,8 +603,17 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
     conf
   }
 
-  private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
-                                       requiredSchema: HoodieTableSchema): 
(StructType, HoodieTableSchema, HoodieTableSchema) = {
+  protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
+                                         requiredSchema: HoodieTableSchema): 
(StructType, HoodieTableSchema, HoodieTableSchema) = {
+    // Since schema requested by the caller might contain partition columns, 
we might need to
+    // prune it, removing all partition columns from it in case these columns 
are not persisted
+    // in the data files
+    //
+    // NOTE: This partition schema is only relevant to file reader to be able 
to embed
+    //       values of partition columns (hereafter referred to as partition 
values) encoded into
+    //       the partition path, and omitted from the data file, back into 
fetched rows;
+    //       Note that, by default, partition columns are not omitted 
therefore specifying
+    //       partition schema for reader is not required
     if (shouldExtractPartitionValuesFromPartitionPath) {
       val partitionSchema = StructType(partitionColumns.map(StructField(_, 
StringType)))
       val prunedDataStructSchema = 
prunePartitionColumns(tableSchema.structTypeSchema)
@@ -645,10 +642,12 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
 object HoodieBaseRelation extends SparkAdapterSupport {
 
-  type BaseFileReader = PartitionedFile => Iterator[InternalRow]
+  case class BaseFileReader(read: PartitionedFile => Iterator[InternalRow], 
val schema: StructType) {
+    def apply(file: PartitionedFile): Iterator[InternalRow] = read.apply(file)
+  }
 
-  private def generateUnsafeProjection(from: StructType, to: StructType) =
-    sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from, 
to)
+  def generateUnsafeProjection(from: StructType, to: StructType): 
UnsafeProjection =
+    sparkAdapter.getCatalystExpressionUtils.generateUnsafeProjection(from, to)
 
   def convertToAvroSchema(structSchema: StructType): Schema =
     sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = 
false, "Record")
@@ -656,6 +655,32 @@ object HoodieBaseRelation extends SparkAdapterSupport {
   def getPartitionPath(fileStatus: FileStatus): Path =
     fileStatus.getPath.getParent
 
+  /**
+   * Projects provided file reader's output from its original schema, into a 
[[requiredSchema]]
+   *
+   * NOTE: [[requiredSchema]] has to be a proper subset of the file reader's 
schema
+   *
+   * @param reader file reader to be projected
+   * @param requiredSchema target schema for the output of the provided file 
reader
+   */
+  def projectReader(reader: BaseFileReader, requiredSchema: StructType): 
BaseFileReader = {
+    
checkState(reader.schema.fields.toSet.intersect(requiredSchema.fields.toSet).size
 == requiredSchema.size)
+
+    if (reader.schema == requiredSchema) {
+      reader
+    } else {
+      val read = reader.apply(_)
+      val projectedRead: PartitionedFile => Iterator[InternalRow] = (file: 
PartitionedFile) => {
+        // NOTE: Projection is not a serializable object, hence it creation 
should only happen w/in
+        //       the executor process
+        val unsafeProjection = generateUnsafeProjection(reader.schema, 
requiredSchema)
+        read(file).map(unsafeProjection)
+      }
+
+      BaseFileReader(projectedRead, requiredSchema)
+    }
+  }
+
   /**
    * Projects provided schema by picking only required (projected) top-level 
columns from it
    *
@@ -666,7 +691,6 @@ object HoodieBaseRelation extends SparkAdapterSupport {
     tableSchema match {
       case Right(internalSchema) =>
         checkState(!internalSchema.isEmptySchema)
-        // TODO extend pruning to leverage optimizer pruned schema
         val prunedInternalSchema = 
InternalSchemaUtils.pruneInternalSchema(internalSchema, 
requiredColumns.toList.asJava)
         val requiredAvroSchema = 
AvroInternalSchemaConverter.convert(prunedInternalSchema, "schema")
         val requiredStructSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
@@ -691,10 +715,10 @@ object HoodieBaseRelation extends SparkAdapterSupport {
 
   private def createHFileReader(spark: SparkSession,
                                 dataSchema: HoodieTableSchema,
-                                requiredSchema: HoodieTableSchema,
+                                requiredDataSchema: HoodieTableSchema,
                                 filters: Seq[Filter],
                                 options: Map[String, String],
-                                hadoopConf: Configuration): BaseFileReader = {
+                                hadoopConf: Configuration): PartitionedFile => 
Iterator[InternalRow] = {
     val hadoopConfBroadcast =
       spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
 
@@ -703,10 +727,10 @@ object HoodieBaseRelation extends SparkAdapterSupport {
       val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new 
Path(partitionedFile.filePath),
         new CacheConfig(hadoopConf))
 
-      val requiredRowSchema = requiredSchema.structTypeSchema
+      val requiredRowSchema = requiredDataSchema.structTypeSchema
       // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] 
aren't serializable
       //       to be passed from driver to executor
-      val requiredAvroSchema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
+      val requiredAvroSchema = new 
Schema.Parser().parse(requiredDataSchema.avroSchemaStr)
       val avroToRowConverter = 
AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, 
requiredRowSchema)
 
       reader.getRecordIterator(requiredAvroSchema).asScala
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
index 6c721723c5..8bd295c7f3 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
@@ -41,15 +41,17 @@ object HoodieDataSourceHelper extends PredicateHelper with 
SparkAdapterSupport {
   /**
    * Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]] 
handling [[ColumnarBatch]],
    * when Parquet's Vectorized Reader is used
+   *
+   * TODO move to HoodieBaseRelation, make private
    */
-  def buildHoodieParquetReader(sparkSession: SparkSession,
-                               dataSchema: StructType,
-                               partitionSchema: StructType,
-                               requiredSchema: StructType,
-                               filters: Seq[Filter],
-                               options: Map[String, String],
-                               hadoopConf: Configuration,
-                               appendPartitionValues: Boolean = false): 
PartitionedFile => Iterator[InternalRow] = {
+  private[hudi] def buildHoodieParquetReader(sparkSession: SparkSession,
+                                             dataSchema: StructType,
+                                             partitionSchema: StructType,
+                                             requiredSchema: StructType,
+                                             filters: Seq[Filter],
+                                             options: Map[String, String],
+                                             hadoopConf: Configuration,
+                                             appendPartitionValues: Boolean = 
false): PartitionedFile => Iterator[InternalRow] = {
     val parquetFileFormat: ParquetFileFormat = 
sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
     val readParquetFile: PartitionedFile => Iterator[Any] = 
parquetFileFormat.buildReaderWithPartitionValues(
       sparkSession = sparkSession,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
index a176626f76..4b7a09795a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
@@ -25,9 +25,9 @@ import 
org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P
 case class HoodieBaseFileSplit(filePartition: FilePartition) extends 
HoodieFileSplit
 
 class HoodieFileScanRDD(@transient private val sparkSession: SparkSession,
-                        readFunction: PartitionedFile => Iterator[InternalRow],
+                        read: PartitionedFile => Iterator[InternalRow],
                         @transient fileSplits: Seq[HoodieBaseFileSplit])
-  extends FileScanRDD(sparkSession, readFunction, 
fileSplits.map(_.filePartition))
+  extends FileScanRDD(sparkSession, read, fileSplits.map(_.filePartition))
     with HoodieUnsafeRDD {
 
   override final def collect(): Array[InternalRow] = 
super[HoodieUnsafeRDD].collect()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index c4c70cb414..512c97806f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -23,9 +23,10 @@ import org.apache.avro.generic.{GenericRecord, 
GenericRecordBuilder, IndexedReco
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieBaseRelation.BaseFileReader
+import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, 
generateUnsafeProjection, projectReader}
 import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
-import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, 
collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, 
projectRowUnsafe, resolveAvroSchemaNullability}
+import 
org.apache.hudi.HoodieMergeOnReadRDD.SafeAvroProjection.collectFieldOrdinals
+import org.apache.hudi.HoodieMergeOnReadRDD._
 import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
 import org.apache.hudi.common.engine.HoodieLocalEngineContext
 import org.apache.hudi.common.fs.FSUtils
@@ -43,8 +44,6 @@ import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadata}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.avro.HoodieAvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, 
UnsafeProjection}
-import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.{Partition, SerializableWritable, SparkContext, 
TaskContext}
 
@@ -56,14 +55,42 @@ import scala.util.Try
 
 case class HoodieMergeOnReadPartition(index: Int, split: 
HoodieMergeOnReadFileSplit) extends Partition
 
-case class HoodieMergeOnReadBaseFileReaders(fullSchemaFileReader: 
BaseFileReader,
-                                            
requiredSchemaFileReaderForMerging: BaseFileReader,
-                                            
requiredSchemaFileReaderForNoMerging: BaseFileReader)
+/**
+ * Class holding base-file readers for 3 different use-cases:
+ *
+ * <ol>
+ *   <li>Full-schema reader: is used when whole row has to be read to perform 
merging correctly.
+ *   This could occur, when no optimizations could be applied and we have to 
fallback to read the whole row from
+ *   the base file and the corresponding delta-log file to merge them 
correctly</li>
+ *
+ *   <li>Required-schema reader: is used when it's fine to only read row's 
projected columns.
+ *   This could occur, when row could be merged with corresponding delta-log 
record leveraging while only having
+ *   projected columns</li>
+ *
+ *   <li>Required-schema reader (skip-merging): is used when when no merging 
will be performed (skip-merged).
+ *   This could occur, when file-group has no delta-log files</li>
+ * </ol>
+ */
+private[hudi] case class HoodieMergeOnReadBaseFileReaders(fullSchemaReader: 
BaseFileReader,
+                                                          
requiredSchemaReader: BaseFileReader,
+                                                          
requiredSchemaReaderSkipMerging: BaseFileReader)
 
+/**
+ * RDD enabling Hudi's Merge-on-Read (MOR) semantic
+ *
+ * @param sc spark's context
+ * @param config hadoop configuration
+ * @param fileReaders suite of base file readers
+ * @param tableSchema table's full schema
+ * @param requiredSchema expected (potentially) projected schema
+ * @param tableState table's state
+ * @param mergeType type of merge performed
+ * @param fileSplits target file-splits this RDD will be iterating over
+ */
 class HoodieMergeOnReadRDD(@transient sc: SparkContext,
                            @transient config: Configuration,
                            fileReaders: HoodieMergeOnReadBaseFileReaders,
-                           dataSchema: HoodieTableSchema,
+                           tableSchema: HoodieTableSchema,
                            requiredSchema: HoodieTableSchema,
                            tableState: HoodieTableState,
                            mergeType: String,
@@ -90,18 +117,19 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
     val iter = mergeOnReadPartition.split match {
       case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
-        
fileReaders.requiredSchemaFileReaderForNoMerging.apply(dataFileOnlySplit.dataFile.get)
+        val projectedReader = 
projectReader(fileReaders.requiredSchemaReaderSkipMerging, 
requiredSchema.structTypeSchema)
+        projectedReader(dataFileOnlySplit.dataFile.get)
 
       case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
         new LogFileIterator(logFileOnlySplit, getConfig)
 
       case split if 
mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
-        val baseFileIterator = 
fileReaders.requiredSchemaFileReaderForNoMerging.apply(split.dataFile.get)
-        new SkipMergeIterator(split, baseFileIterator, getConfig)
+        val reader = fileReaders.requiredSchemaReaderSkipMerging
+        new SkipMergeIterator(split, reader, getConfig)
 
       case split if 
mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
-        val (baseFileIterator, schema) = readBaseFile(split)
-        new RecordMergingFileIterator(split, baseFileIterator, schema, 
getConfig)
+        val reader = pickBaseFileReader
+        new RecordMergingFileIterator(split, reader, getConfig)
 
       case _ => throw new HoodieException(s"Unable to select an Iterator to 
read the Hoodie MOR File Split for " +
         s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
@@ -120,7 +148,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     iter
   }
 
-  private def readBaseFile(split: HoodieMergeOnReadFileSplit): 
(Iterator[InternalRow], HoodieTableSchema) = {
+  private def pickBaseFileReader: BaseFileReader = {
     // NOTE: This is an optimization making sure that even for MOR tables we 
fetch absolute minimum
     //       of the stored data possible, while still properly executing 
corresponding relation's semantic
     //       and meet the query's requirements.
@@ -129,10 +157,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     //          a) It does use one of the standard (and whitelisted) Record 
Payload classes
     //       then we can avoid reading and parsing the records w/ _full_ 
schema, and instead only
     //       rely on projected one, nevertheless being able to perform merging 
correctly
-    if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
-      (fileReaders.fullSchemaFileReader(split.dataFile.get), dataSchema)
-    else
-      (fileReaders.requiredSchemaFileReaderForMerging(split.dataFile.get), 
requiredSchema)
+    if (whitelistedPayloadClasses.contains(tableState.recordPayloadClassName)) 
{
+      fileReaders.requiredSchemaReader
+    } else {
+      fileReaders.fullSchemaReader
+    }
   }
 
   override protected def getPartitions: Array[Partition] =
@@ -156,38 +185,27 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     protected override val requiredAvroSchema: Schema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
     protected override val requiredStructTypeSchema: StructType = 
requiredSchema.structTypeSchema
 
-    protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(dataSchema.avroSchemaStr)
+    protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
 
-    protected val recordBuilder: GenericRecordBuilder = new 
GenericRecordBuilder(requiredAvroSchema)
     protected var recordToLoad: InternalRow = _
 
-    // TODO validate whether we need to do UnsafeProjection
-    protected val unsafeProjection: UnsafeProjection = 
UnsafeProjection.create(requiredStructTypeSchema)
-
-    // NOTE: This maps _required_ schema fields onto the _full_ table schema, 
collecting their "ordinals"
-    //       w/in the record payload. This is required, to project records 
read from the Delta Log file
-    //       which always reads records in full schema (never projected, due 
to the fact that DL file might
-    //       be stored in non-columnar formats like Avro, HFile, etc)
-    private val requiredSchemaFieldOrdinals: List[Int] = 
collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
+    private val requiredSchemaSafeAvroProjection = 
SafeAvroProjection.create(logFileReaderAvroSchema, requiredAvroSchema)
 
     private var logScanner = {
-      val internalSchema = 
dataSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
+      val internalSchema = 
tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
       HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), 
logFileReaderAvroSchema, tableState,
         maxCompactionMemoryInBytes, config, internalSchema)
     }
 
     private val logRecords = logScanner.getRecords.asScala
 
-    // NOTE: This iterator iterates over already projected (in required 
schema) records
     // NOTE: This have to stay lazy to make sure it's initialized only at the 
point where it's
     //       going to be used, since we modify `logRecords` before that and 
therefore can't do it any earlier
     protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
       logRecords.iterator.map {
         case (_, record) =>
-          val avroRecordOpt = 
toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, 
payloadProps))
-          avroRecordOpt.map {
-            avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, 
requiredSchemaFieldOrdinals, recordBuilder)
-          }
+          toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, 
payloadProps))
+            .map(_.asInstanceOf[GenericRecord])
       }
 
     protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: 
HoodieRecordPayload[_]]] =
@@ -205,7 +223,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
           // Record has been deleted, skipping
           this.hasNextInternal
         } else {
-          recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
+          val projectedAvroRecord = 
requiredSchemaSafeAvroProjection(avroRecordOpt.get)
+          recordToLoad = deserialize(projectedAvroRecord)
           true
         }
       }
@@ -229,14 +248,18 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
    * performing any combination/merging of the records w/ the same primary 
keys (ie producing duplicates potentially)
    */
   private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
-                                  baseFileIterator: Iterator[InternalRow],
+                                  baseFileReader: BaseFileReader,
                                   config: Configuration)
     extends LogFileIterator(split, config) {
 
+    private val requiredSchemaUnsafeProjection = 
generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema)
+
+    private val baseFileIterator = baseFileReader(split.dataFile.get)
+
     override def hasNext: Boolean = {
       if (baseFileIterator.hasNext) {
-        val curRow = baseFileIterator.next()
-        recordToLoad = curRow
+        // No merge is required, simply load current row and project into 
required schema
+        recordToLoad = requiredSchemaUnsafeProjection(baseFileIterator.next())
         true
       } else {
         super[LogFileIterator].hasNext
@@ -250,8 +273,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
    * streams
    */
   private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
-                                          baseFileIterator: 
Iterator[InternalRow],
-                                          baseFileReaderSchema: 
HoodieTableSchema,
+                                          baseFileReader: BaseFileReader,
                                           config: Configuration)
     extends LogFileIterator(split, config) {
 
@@ -260,13 +282,17 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     //        - Projected schema
     //       As such, no particular schema could be assumed, and therefore we 
rely on the caller
     //       to correspondingly set the scheme of the expected output of 
base-file reader
-    private val baseFileReaderAvroSchema = new 
Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr)
-    private val requiredSchemaFieldOrdinals: List[Int] = 
collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema)
+    private val baseFileReaderAvroSchema = 
sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable 
= false, "record")
+
+    private val serializer = 
sparkAdapter.createAvroSerializer(baseFileReader.schema, 
baseFileReaderAvroSchema, nullable = false)
 
-    private val serializer = 
sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema,
-      baseFileReaderAvroSchema, 
resolveAvroSchemaNullability(baseFileReaderAvroSchema))
+    private val reusableRecordBuilder: GenericRecordBuilder = new 
GenericRecordBuilder(requiredAvroSchema)
 
-    private val recordKeyOrdinal = 
baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)
+    private val recordKeyOrdinal = 
baseFileReader.schema.fieldIndex(tableState.recordKeyField)
+
+    private val requiredSchemaUnsafeProjection = 
generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema)
+
+    private val baseFileIterator = baseFileReader(split.dataFile.get)
 
     override def hasNext: Boolean = hasNextInternal
 
@@ -275,26 +301,22 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     //       handling records
     @tailrec private def hasNextInternal: Boolean = {
       if (baseFileIterator.hasNext) {
-        val curRowRecord = baseFileIterator.next()
-        val curKey = curRowRecord.getString(recordKeyOrdinal)
+        val curRow = baseFileIterator.next()
+        val curKey = curRow.getString(recordKeyOrdinal)
         val updatedRecordOpt = removeLogRecord(curKey)
         if (updatedRecordOpt.isEmpty) {
-          // No merge needed, load current row with required projected schema
-          recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord, 
requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals))
+          // No merge is required, simply load current row and project into 
required schema
+          recordToLoad = requiredSchemaUnsafeProjection(curRow)
           true
         } else {
-          val mergedAvroRecordOpt = merge(serialize(curRowRecord), 
updatedRecordOpt.get)
+          val mergedAvroRecordOpt = merge(serialize(curRow), 
updatedRecordOpt.get)
           if (mergedAvroRecordOpt.isEmpty) {
             // Record has been deleted, skipping
             this.hasNextInternal
           } else {
-            // NOTE: In occurrence of a merge we can't know the schema of the 
record being returned, b/c
-            //       record from the Delta Log will bear (full) Table schema, 
while record from the Base file
-            //       might already be read in projected one (as an 
optimization).
-            //       As such we can't use more performant 
[[projectAvroUnsafe]], and instead have to fallback
-            //       to [[projectAvro]]
-            val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, 
requiredAvroSchema, recordBuilder)
-            recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
+            val projectedAvroRecord = 
projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord],
+              requiredAvroSchema, reusableRecordBuilder)
+            recordToLoad = deserialize(projectedAvroRecord)
             true
           }
         }
@@ -381,66 +403,10 @@ private object HoodieMergeOnReadRDD {
     }
   }
 
-  /**
-   * Projects provided instance of [[InternalRow]] into provided schema, 
assuming that the
-   * the schema of the original row is strictly a superset of the given one
-   */
-  private def projectRowUnsafe(row: InternalRow,
-                         projectedSchema: StructType,
-                         ordinals: Seq[Int]): InternalRow = {
-    val projectedRow = new SpecificInternalRow(projectedSchema)
-    var curIndex = 0
-    projectedSchema.zip(ordinals).foreach { case (field, pos) =>
-      val curField = if (row.isNullAt(pos)) {
-        null
-      } else {
-        row.get(pos, field.dataType)
-      }
-      projectedRow.update(curIndex, curField)
-      curIndex += 1
-    }
-    projectedRow
-  }
-
-  /**
-   * Projects provided instance of [[IndexedRecord]] into provided schema, 
assuming that the
-   * the schema of the original row is strictly a superset of the given one
-   */
-  def projectAvroUnsafe(record: IndexedRecord,
-                        projectedSchema: Schema,
-                        ordinals: List[Int],
-                        recordBuilder: GenericRecordBuilder): GenericRecord = {
+  private def projectAvroUnsafe(record: GenericRecord, projectedSchema: 
Schema, reusableRecordBuilder: GenericRecordBuilder): GenericRecord = {
     val fields = projectedSchema.getFields.asScala
-    checkState(fields.length == ordinals.length)
-    fields.zip(ordinals).foreach {
-      case (field, pos) => recordBuilder.set(field, record.get(pos))
-    }
-    recordBuilder.build()
-  }
-
-  /**
-   * Projects provided instance of [[IndexedRecord]] into provided schema, 
assuming that the
-   * the schema of the original row is strictly a superset of the given one
-   *
-   * This is a "safe" counterpart of [[projectAvroUnsafe]]: it does build 
mapping of the record's
-   * schema into projected one itself (instead of expecting such mapping from 
the caller)
-   */
-  def projectAvro(record: IndexedRecord,
-                  projectedSchema: Schema,
-                  recordBuilder: GenericRecordBuilder): GenericRecord = {
-    projectAvroUnsafe(record, projectedSchema, 
collectFieldOrdinals(projectedSchema, record.getSchema), recordBuilder)
-  }
-
-  /**
-   * Maps [[projected]] [[Schema]] onto [[source]] one, collecting 
corresponding field ordinals w/in it, which
-   * will be subsequently used by either [[projectRowUnsafe]] or 
[[projectAvroUnsafe()]] method
-   *
-   * @param projected target projected schema (which is a proper subset of 
[[source]] [[Schema]])
-   * @param source source schema of the record being projected
-   * @return list of ordinals of corresponding fields of [[projected]] schema 
w/in [[source]] one
-   */
-  private def collectFieldOrdinals(projected: Schema, source: Schema): 
List[Int] = {
-    projected.getFields.asScala.map(f => 
source.getField(f.name()).pos()).toList
+    fields.foreach(field => reusableRecordBuilder.set(field, 
record.get(field.name())))
+    reusableRecordBuilder.build()
   }
 
   private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = {
@@ -452,9 +418,48 @@ private object HoodieMergeOnReadRDD {
       .getParent
   }
 
-  private def resolveAvroSchemaNullability(schema: Schema) = {
-    AvroConversionUtils.resolveAvroTypeNullability(schema) match {
-      case (nullable, _) => nullable
+  // TODO extract to HoodieAvroSchemaUtils
+  abstract class AvroProjection extends (GenericRecord => GenericRecord)
+
+  class SafeAvroProjection(sourceSchema: Schema,
+                           projectedSchema: Schema,
+                           reusableRecordBuilder: GenericRecordBuilder = null) 
extends AvroProjection {
+
+    private val ordinals: List[Int] = collectFieldOrdinals(projectedSchema, 
sourceSchema)
+    private val recordBuilder: GenericRecordBuilder =
+      if (reusableRecordBuilder != null) {
+        reusableRecordBuilder
+      } else {
+        new GenericRecordBuilder(projectedSchema)
+      }
+
+    override def apply(record: GenericRecord): GenericRecord = {
+      val fields = projectedSchema.getFields.asScala
+      checkState(fields.length == ordinals.length)
+      fields.zip(ordinals).foreach {
+        case (field, pos) => recordBuilder.set(field, record.get(pos))
+      }
+      recordBuilder.build()
+    }
+  }
+
+  object SafeAvroProjection {
+    def create(sourceSchema: Schema, projectedSchema: Schema, 
reusableRecordBuilder: GenericRecordBuilder = null): SafeAvroProjection =
+      new SafeAvroProjection(
+        sourceSchema = sourceSchema,
+        projectedSchema = projectedSchema,
+        reusableRecordBuilder = reusableRecordBuilder)
+
+    /**
+     * Maps [[projected]] [[Schema]] onto [[source]] one, collecting 
corresponding field ordinals w/in it, which
+     * will be subsequently used by either [[projectRowUnsafe]] or 
[[projectAvroUnsafe()]] method
+     *
+     * @param projected target projected schema (which is a proper subset of 
[[source]] [[Schema]])
+     * @param source source schema of the record being projected
+     * @return list of ordinals of corresponding fields of [[projected]] 
schema w/in [[source]] one
+     */
+    private def collectFieldOrdinals(projected: Schema, source: Schema): 
List[Int] = {
+      projected.getFields.asScala.map(f => 
source.getField(f.name()).pos()).toList
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 6fa130ac8c..0fc6ef2f83 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -17,7 +17,6 @@
 
 package org.apache.hudi
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{GlobPattern, Path}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
@@ -27,7 +26,9 @@ import 
org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata, 
getWritePartitionPaths, listAffectedFilesForCommits}
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
@@ -58,32 +59,15 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
   }
 
   protected override def composeRDD(fileSplits: 
Seq[HoodieMergeOnReadFileSplit],
-                                    partitionSchema: StructType,
-                                    dataSchema: HoodieTableSchema,
+                                    tableSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
                                     requestedColumns: Array[String],
-                                    filters: Array[Filter]): 
HoodieMergeOnReadRDD = {
-    val fullSchemaParquetReader = createBaseFileReader(
-      spark = sqlContext.sparkSession,
-      partitionSchema = partitionSchema,
-      dataSchema = dataSchema,
-      requiredSchema = dataSchema,
-      // This file-reader is used to read base file records, subsequently 
merging them with the records
-      // stored in delta-log files. As such, we have to read _all_ records 
from the base file, while avoiding
-      // applying any user-defined filtering _before_ we complete combining 
them w/ delta-log records (to make sure that
-      // we combine them correctly)
-      //
-      // The only filtering applicable here is the filtering to make sure 
we're only fetching records that
-      // fall into incremental span of the timeline being queried
-      filters = incrementalSpanRecordFilters,
-      options = optParams,
-      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
-      //       to configure Parquet reader appropriately
-      hadoopConf = embedInternalSchema(new Configuration(conf), 
internalSchemaOpt)
-    )
-
-    val (requiredSchemaBaseFileReaderMerging, 
requiredSchemaBaseFileReaderNoMerging) =
-      createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, 
requiredSchema, requestedColumns, filters ++ incrementalSpanRecordFilters)
+                                    filters: Array[Filter]): RDD[InternalRow] 
= {
+    // The only required filters are ones that make sure we're only fetching 
records that
+    // fall into incremental span of the timeline being queried
+    val requiredFilters = incrementalSpanRecordFilters
+    val optionalFilters = filters
+    val readers = createBaseFileReaders(tableSchema, requiredSchema, 
requestedColumns, requiredFilters, optionalFilters)
 
     val hoodieTableState = getTableState
     // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to 
make sure returned iterator is appropriately
@@ -91,12 +75,8 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
     new HoodieMergeOnReadRDD(
       sqlContext.sparkContext,
       config = jobConf,
-      fileReaders = HoodieMergeOnReadBaseFileReaders(
-        fullSchemaFileReader = fullSchemaParquetReader,
-        requiredSchemaFileReaderForMerging = 
requiredSchemaBaseFileReaderMerging,
-        requiredSchemaFileReaderForNoMerging = 
requiredSchemaBaseFileReaderNoMerging
-      ),
-      dataSchema = dataSchema,
+      fileReaders = readers,
+      tableSchema = tableSchema,
       requiredSchema = requiredSchema,
       tableState = hoodieTableState,
       mergeType = mergeType,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index c6d4eafafc..1cb35aa61d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
@@ -80,44 +81,113 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
   }
 
   protected override def composeRDD(fileSplits: 
Seq[HoodieMergeOnReadFileSplit],
-                                    partitionSchema: StructType,
-                                    dataSchema: HoodieTableSchema,
+                                    tableSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
                                     requestedColumns: Array[String],
-                                    filters: Array[Filter]): 
HoodieMergeOnReadRDD = {
-    val fullSchemaBaseFileReader = createBaseFileReader(
+                                    filters: Array[Filter]): RDD[InternalRow] 
= {
+    val requiredFilters = Seq.empty
+    val optionalFilters = filters
+    val readers = createBaseFileReaders(tableSchema, requiredSchema, 
requestedColumns, requiredFilters, optionalFilters)
+
+    val tableState = getTableState
+    new HoodieMergeOnReadRDD(
+      sqlContext.sparkContext,
+      config = jobConf,
+      fileReaders = readers,
+      tableSchema = tableSchema,
+      requiredSchema = requiredSchema,
+      tableState = tableState,
+      mergeType = mergeType,
+      fileSplits = fileSplits)
+  }
+
+  protected def createBaseFileReaders(tableSchema: HoodieTableSchema,
+                                      requiredSchema: HoodieTableSchema,
+                                      requestedColumns: Array[String],
+                                      requiredFilters: Seq[Filter],
+                                      optionalFilters: Seq[Filter] = 
Seq.empty): HoodieMergeOnReadBaseFileReaders = {
+    val (partitionSchema, dataSchema, requiredDataSchema) =
+      tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+    val fullSchemaReader = createBaseFileReader(
       spark = sqlContext.sparkSession,
       partitionSchema = partitionSchema,
       dataSchema = dataSchema,
-      requiredSchema = dataSchema,
+      requiredDataSchema = dataSchema,
       // This file-reader is used to read base file records, subsequently 
merging them with the records
       // stored in delta-log files. As such, we have to read _all_ records 
from the base file, while avoiding
       // applying any filtering _before_ we complete combining them w/ 
delta-log records (to make sure that
-      // we combine them correctly)
-      filters = Seq.empty,
+      // we combine them correctly);
+      // As such only required filters could be pushed-down to such reader
+      filters = requiredFilters,
       options = optParams,
       // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
       //       to configure Parquet reader appropriately
       hadoopConf = embedInternalSchema(new Configuration(conf), 
internalSchemaOpt)
     )
 
-    val (requiredSchemaBaseFileReaderMerging, 
requiredSchemaBaseFileReaderNoMerging) =
-      createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, 
requiredSchema, requestedColumns, filters)
-
-    val tableState = getTableState
-    new HoodieMergeOnReadRDD(
-      sqlContext.sparkContext,
-      config = jobConf,
-      fileReaders = HoodieMergeOnReadBaseFileReaders(
-        fullSchemaFileReader = fullSchemaBaseFileReader,
-        requiredSchemaFileReaderForMerging = 
requiredSchemaBaseFileReaderMerging,
-        requiredSchemaFileReaderForNoMerging = 
requiredSchemaBaseFileReaderNoMerging
-      ),
+    val requiredSchemaReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      partitionSchema = partitionSchema,
       dataSchema = dataSchema,
-      requiredSchema = requiredSchema,
-      tableState = tableState,
-      mergeType = mergeType,
-      fileSplits = fileSplits)
+      requiredDataSchema = requiredDataSchema,
+      // This file-reader is used to read base file records, subsequently 
merging them with the records
+      // stored in delta-log files. As such, we have to read _all_ records 
from the base file, while avoiding
+      // applying any filtering _before_ we complete combining them w/ 
delta-log records (to make sure that
+      // we combine them correctly);
+      // As such only required filters could be pushed-down to such reader
+      filters = requiredFilters,
+      options = optParams,
+      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+      //       to configure Parquet reader appropriately
+      hadoopConf = embedInternalSchema(new Configuration(conf), 
requiredDataSchema.internalSchema)
+    )
+
+    // Check whether fields required for merging were also requested to be 
fetched
+    // by the query:
+    //    - In case they were, there's no optimization we could apply here (we 
will have
+    //    to fetch such fields)
+    //    - In case they were not, we will provide 2 separate file-readers
+    //        a) One which would be applied to file-groups w/ delta-logs 
(merging)
+    //        b) One which would be applied to file-groups w/ no delta-logs or
+    //           in case query-mode is skipping merging
+    val mandatoryColumns = 
mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName)
+    if (mandatoryColumns.forall(requestedColumns.contains)) {
+      HoodieMergeOnReadBaseFileReaders(
+        fullSchemaReader = fullSchemaReader,
+        requiredSchemaReader = requiredSchemaReader,
+        requiredSchemaReaderSkipMerging = requiredSchemaReader
+      )
+    } else {
+      val prunedRequiredSchema = {
+        val superfluousColumnNames = 
mandatoryColumns.filterNot(requestedColumns.contains)
+        val prunedStructSchema =
+          StructType(requiredDataSchema.structTypeSchema.fields
+            .filterNot(f => superfluousColumnNames.contains(f.name)))
+
+        HoodieTableSchema(prunedStructSchema, 
convertToAvroSchema(prunedStructSchema).toString)
+      }
+
+      val requiredSchemaReaderSkipMerging = createBaseFileReader(
+        spark = sqlContext.sparkSession,
+        partitionSchema = partitionSchema,
+        dataSchema = dataSchema,
+        requiredDataSchema = prunedRequiredSchema,
+        // This file-reader is only used in cases when no merging is 
performed, therefore it's safe to push
+        // down these filters to the base file readers
+        filters = requiredFilters ++ optionalFilters,
+        options = optParams,
+        // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+        //       to configure Parquet reader appropriately
+        hadoopConf = embedInternalSchema(new Configuration(conf), 
requiredDataSchema.internalSchema)
+      )
+
+      HoodieMergeOnReadBaseFileReaders(
+        fullSchemaReader = fullSchemaReader,
+        requiredSchemaReader = requiredSchemaReader,
+        requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging
+      )
+    }
   }
 
   protected override def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
@@ -156,7 +226,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
       spark = sqlContext.sparkSession,
       partitionSchema = partitionSchema,
       dataSchema = dataSchema,
-      requiredSchema = requiredDataSchema,
+      requiredDataSchema = requiredDataSchema,
       filters = filters,
       options = optParams,
       // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
@@ -189,7 +259,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
         spark = sqlContext.sparkSession,
         partitionSchema = partitionSchema,
         dataSchema = dataSchema,
-        requiredSchema = prunedRequiredSchema,
+        requiredDataSchema = prunedRequiredSchema,
         filters = filters,
         options = optParams,
         // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
index b5d19bd37d..6a04ec57e1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
@@ -384,7 +384,7 @@ private object ColumnStatsExpressionUtils {
    * Returns only [[AttributeReference]] contained as a sub-expression
    */
   object AllowedTransformationExpression extends SparkAdapterSupport {
-    val exprUtils: HoodieCatalystExpressionUtils = 
sparkAdapter.getCatalystExpressionUtils()
+    val exprUtils: HoodieCatalystExpressionUtils = 
sparkAdapter.getCatalystExpressionUtils
 
     def unapply(expr: Expression): Option[AttributeReference] = {
       // First step, we check that expression
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index f2ae31a0f7..289f800b27 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -41,7 +41,7 @@ class RunClusteringProcedure extends BaseProcedure
   with Logging
   with SparkAdapterSupport {
 
-  private val exprUtils = sparkAdapter.getCatalystExpressionUtils()
+  private val exprUtils = sparkAdapter.getCatalystExpressionUtils
 
   /**
    * OPTIMIZE table_name|table_path [WHERE predicate]
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
index bd5aa01216..9300b94bb9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
@@ -57,7 +57,7 @@ case class IndexRow(fileName: String,
 
 class TestDataSkippingUtils extends HoodieClientTestBase with 
SparkAdapterSupport {
 
-  val exprUtils: HoodieCatalystExpressionUtils = 
sparkAdapter.getCatalystExpressionUtils()
+  val exprUtils: HoodieCatalystExpressionUtils = 
sparkAdapter.getCatalystExpressionUtils
 
   var spark: SparkSession = _
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 57c826af92..9aa3c509c3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -17,14 +17,13 @@
 
 package org.apache.spark.sql.hudi
 
-import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_NAME, 
MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, 
TABLE_TYPE}
+import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieDuplicateKeyException
 import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.internal.SQLConf
 
 import java.io.File
 
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index fdc0857800..30af252d2d 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -40,7 +40,7 @@ import scala.collection.mutable.ArrayBuffer
  */
 class Spark2Adapter extends SparkAdapter {
 
-  override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = 
HoodieSpark2CatalystExpressionUtils
+  override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = 
HoodieSpark2CatalystExpressionUtils
 
   override def getCatalystPlanUtils: HoodieCatalystPlansUtils = 
HoodieSpark2CatalystPlanUtils
 
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
index 8093d70692..028bb5788c 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
@@ -19,20 +19,20 @@
 package org.apache.spark.sql.adapter
 
 import org.apache.avro.Schema
+import org.apache.spark.sql.{HoodieCatalystExpressionUtils, 
HoodieCatalystPlansUtils, HoodieSpark31CatalystExpressionUtils, 
HoodieSpark31CatalystPlanUtils}
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, 
HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, 
HoodieSpark3_1AvroSerializer}
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark31HoodieParquetFileFormat}
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.{HoodieCatalystExpressionUtils, 
HoodieCatalystPlansUtils, HoodieSpark31CatalystExpressionUtils, 
HoodieSpark31CatalystPlanUtils}
 
 /**
  * Implementation of [[SparkAdapter]] for Spark 3.1.x
  */
 class Spark3_1Adapter extends BaseSpark3Adapter {
 
-  def getCatalystPlanUtils: HoodieCatalystPlansUtils = 
HoodieSpark31CatalystPlanUtils
+  override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = 
HoodieSpark31CatalystExpressionUtils
 
-  override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = 
HoodieSpark31CatalystExpressionUtils
+  override def getCatalystPlanUtils: HoodieCatalystPlansUtils = 
HoodieSpark31CatalystPlanUtils
 
   override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: 
Schema, nullable: Boolean): HoodieAvroSerializer =
     new HoodieSpark3_1AvroSerializer(rootCatalystType, rootAvroType, nullable)
diff --git 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
index ceb66b7437..fe25ee7fdc 100644
--- 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
@@ -30,15 +30,15 @@ import org.apache.spark.sql._
  */
 class Spark3_2Adapter extends BaseSpark3Adapter {
 
-  def getCatalystPlanUtils: HoodieCatalystPlansUtils = 
HoodieSpark32CatalystPlanUtils
-
   override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: 
Schema, nullable: Boolean): HoodieAvroSerializer =
     new HoodieSpark3_2AvroSerializer(rootCatalystType, rootAvroType, nullable)
 
   override def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType): HoodieAvroDeserializer =
     new HoodieSpark3_2AvroDeserializer(rootAvroType, rootCatalystType)
 
-  override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = 
HoodieSpark32CatalystExpressionUtils
+  override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = 
HoodieSpark32CatalystExpressionUtils
+
+  override def getCatalystPlanUtils: HoodieCatalystPlansUtils = 
HoodieSpark32CatalystPlanUtils
 
   override def createExtendedSparkParser: Option[(SparkSession, 
ParserInterface) => ParserInterface] = {
     Some(

Reply via email to