This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 034adda  [HUDI-3396] Make sure `BaseFileOnlyViewRelation` only reads 
projected columns (#4818)
034adda is described below

commit 034addaef5834eff09cfd9ac5cc2656df95ca0e8
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Wed Mar 9 18:45:25 2022 -0800

    [HUDI-3396] Make sure `BaseFileOnlyViewRelation` only reads projected 
columns (#4818)
    
    NOTE: This change is first part of the series to clean up Hudi's Spark 
DataSource related implementations, making sure there's minimal code 
duplication among them, implementations are consistent and performant
    
    This PR is making sure that BaseFileOnlyViewRelation only reads projected 
columns as well as avoiding unnecessary serde from Row to InternalRow
    
    Brief change log
    - Introduced HoodieBaseRDD as a base for all custom RDD impls
    - Extracted common fields/methods to HoodieBaseRelation
    - Cleaned up and streamlined HoodieBaseFileViewOnlyRelation
    - Fixed all of the Relations to avoid superfluous Row <> InternalRow 
conversions
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +-
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |  18 +-
 ...zerTrait.scala => HoodieAvroDeserializer.scala} |  10 +-
 ...lizerTrait.scala => HoodieAvroSerializer.scala} |   2 +-
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |  10 +-
 .../SparkClientFunctionalTestHarness.java          |  32 +-
 .../hudi/metadata/HoodieMetadataPayload.java       |   2 +-
 .../org/apache/hudi/BaseFileOnlyViewRelation.scala | 135 +++++---
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  78 ++++-
 .../org/apache/hudi/HoodieDataSourceHelper.scala   |  39 +--
 .../scala/org/apache/hudi/HoodieFileScanRDD.scala  |  60 +---
 .../org/apache/hudi/HoodieMergeOnReadRDD.scala     |  16 +-
 .../scala/org/apache/hudi/HoodieUnsafeRDD.scala    |  68 ++++
 .../hudi/MergeOnReadIncrementalRelation.scala      |  29 +-
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |  53 +--
 .../org/apache/spark/HoodieUnsafeRDDUtils.scala    |  44 +++
 ...lizer.scala => HoodieSparkAvroSerializer.scala} |   4 +-
 .../TestConvertFilterToCatalystExpression.scala    |   8 +-
 .../hudi/functional/TestMORDataSourceStorage.scala |   3 +
 .../functional/TestParquetColumnProjection.scala   | 355 +++++++++++++++++++++
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |  10 +-
 ...er.scala => HoodieSpark2AvroDeserializer.scala} |  10 +-
 .../apache/spark/sql/adapter/Spark3Adapter.scala   |  10 +-
 ...er.scala => HoodieSpark3AvroDeserializer.scala} |   6 +-
 .../functional/TestHDFSParquetImporter.java        |  11 +-
 25 files changed, 752 insertions(+), 265 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 6053dcf..4202cbd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -44,6 +44,7 @@ import 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig;
@@ -1552,7 +1553,8 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public CompressionCodecName getParquetCompressionCodec() {
-    return 
CompressionCodecName.fromConf(getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME));
+    String codecName = 
getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
+    return CompressionCodecName.fromConf(StringUtils.isNullOrEmpty(codecName) 
? null : codecName);
   }
 
   public boolean parquetDictionaryEnabled() {
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index b288289..c963806 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -177,14 +177,24 @@ object HoodieSparkUtils extends SparkAdapterSupport {
    * Convert Filters to Catalyst Expressions and joined by And. If convert 
success return an
    * Non-Empty Option[Expression],or else return None.
    */
-  def convertToCatalystExpressions(filters: Array[Filter],
-                                   tableSchema: StructType): 
Option[Expression] = {
-    val expressions = filters.map(convertToCatalystExpression(_, tableSchema))
+  def convertToCatalystExpressions(filters: Seq[Filter],
+                                   tableSchema: StructType): 
Seq[Option[Expression]] = {
+    filters.map(convertToCatalystExpression(_, tableSchema))
+  }
+
+
+  /**
+   * Convert Filters to Catalyst Expressions and joined by And. If convert 
success return an
+   * Non-Empty Option[Expression],or else return None.
+   */
+  def convertToCatalystExpression(filters: Array[Filter],
+                                  tableSchema: StructType): Option[Expression] 
= {
+    val expressions = convertToCatalystExpressions(filters, tableSchema)
     if (expressions.forall(p => p.isDefined)) {
       if (expressions.isEmpty) {
         None
       } else if (expressions.length == 1) {
-        expressions(0)
+        expressions.head
       } else {
         
Some(expressions.map(_.get).reduce(org.apache.spark.sql.catalyst.expressions.And))
       }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializerTrait.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala
similarity index 75%
rename from 
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializerTrait.scala
rename to 
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala
index 5c30353..4c4ddb5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializerTrait.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala
@@ -24,12 +24,6 @@ package org.apache.spark.sql.avro
  *       If you're looking to convert Avro into "deserialized" [[Row]] 
(comprised of Java native types),
  *       please check [[AvroConversionUtils]]
  */
-trait HoodieAvroDeserializerTrait {
-  final def deserialize(data: Any): Option[Any] =
-    doDeserialize(data) match {
-      case opt: Option[_] => opt    // As of Spark 3.1, this will return data 
wrapped with Option, so we fetch the data
-      case row => Some(row)         // For other Spark versions, return the 
data as is
-    }
-
-  protected def doDeserialize(data: Any): Any
+trait HoodieAvroDeserializer {
+  def deserialize(data: Any): Option[Any]
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializerTrait.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala
similarity index 97%
rename from 
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializerTrait.scala
rename to 
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala
index 159d8da..84ba44b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializerTrait.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala
@@ -23,6 +23,6 @@ package org.apache.spark.sql.avro
  * NOTE: This is low-level component operating on Spark internal data-types 
(comprising [[InternalRow]]).
  *       If you're looking to convert "deserialized" [[Row]] into Avro, please 
check [[AvroConversionUtils]]
  */
-trait HoodieAvroSerializerTrait {
+trait HoodieAvroSerializer {
   def serialize(catalystData: Any): Any
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 62bdc44..e41a9c1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi
 
 import org.apache.avro.Schema
 import org.apache.hudi.client.utils.SparkRowSerDe
-import org.apache.spark.sql.avro.{HoodieAvroDeserializerTrait, 
HoodieAvroSerializerTrait}
+import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -43,16 +43,16 @@ import java.util.Locale
 trait SparkAdapter extends Serializable {
 
   /**
-   * Creates instance of [[HoodieAvroSerializerTrait]] providing for ability 
to serialize
+   * Creates instance of [[HoodieAvroSerializer]] providing for ability to 
serialize
    * Spark's [[InternalRow]] into Avro payloads
    */
-  def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, 
nullable: Boolean): HoodieAvroSerializerTrait
+  def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, 
nullable: Boolean): HoodieAvroSerializer
 
   /**
-   * Creates instance of [[HoodieAvroDeserializerTrait]] providing for ability 
to deserialize
+   * Creates instance of [[HoodieAvroDeserializer]] providing for ability to 
deserialize
    * Avro payloads into Spark's [[InternalRow]]
    */
-  def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType): HoodieAvroDeserializerTrait
+  def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType): HoodieAvroDeserializer
 
   /**
    * Create the SparkRowSerDe.
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index 94e080c..f9676c6 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -19,6 +19,13 @@
 
 package org.apache.hudi.testutils;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -28,6 +35,7 @@ import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -42,6 +50,7 @@ import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
 import org.apache.hudi.table.HoodieSparkTable;
@@ -50,14 +59,11 @@ import 
org.apache.hudi.testutils.providers.HoodieMetaClientProvider;
 import org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
 import org.apache.hudi.testutils.providers.SparkProvider;
 import org.apache.hudi.timeline.service.TimelineService;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.junit.jupiter.api.AfterAll;
@@ -69,6 +75,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
@@ -348,6 +355,21 @@ public class SparkClientFunctionalTestHarness implements 
SparkProvider, HoodieMe
         .withRollbackUsingMarkers(rollbackUsingMarkers);
   }
 
+  protected Dataset<Row> toDataset(List<HoodieRecord> records, Schema schema) {
+    List<GenericRecord> avroRecords = records.stream()
+        .map(r -> {
+          HoodieRecordPayload payload = (HoodieRecordPayload) r.getData();
+          try {
+            return (GenericRecord) payload.getInsertValue(schema).get();
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed to extract Avro payload", e);
+          }
+        })
+        .collect(Collectors.toList());
+    JavaRDD<GenericRecord> jrdd = jsc.parallelize(avroRecords, 2);
+    return AvroConversionUtils.createDataFrame(jrdd.rdd(), schema.toString(), 
spark);
+  }
+
   protected int incrementTimelineServicePortToUse() {
     // Increment the timeline service port for each individual test
     // to avoid port reuse causing failures
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 548fbb9..c0ad8b1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -133,7 +133,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
       // This can be simplified using SpecificData.deepcopy once this bug is 
fixed
       // https://issues.apache.org/jira/browse/AVRO-1811
       //
-      // NOTE: {@code HoodieMetadataRecord} has to always carry both "key" nad 
"type" fields
+      // NOTE: {@code HoodieMetadataRecord} has to always carry both "key" and 
"type" fields
       //       for it to be handled appropriately, therefore these fields have 
to be reflected
       //       in any (read-)projected schema
       key = record.get(KEY_FIELD_NAME).toString();
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala
index 8e94805..adc34af 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala
@@ -18,63 +18,82 @@
 
 package org.apache.hudi
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-
+import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.hadoop.HoodieROTablePathFilter
-
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
-import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
PartitionedFile}
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
SubqueryExpression}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{BooleanType, StructType}
+import org.apache.spark.sql.types.StructType
 
 /**
- * The implement of [[BaseRelation]], which is used to respond to query that 
only touches the base files(Parquet),
- * like query COW tables in Snapshot-Query and Read_Optimized mode and MOR 
tables in Read_Optimized mode.
+ * [[BaseRelation]] implementation only reading Base files of Hudi tables, 
essentially supporting following querying
+ * modes:
+ * <ul>
+ * <li>For COW tables: Snapshot</li>
+ * <li>For MOR tables: Read-optimized</li>
+ * </ul>
+ *
+ * NOTE: The reason this Relation is used in liue of Spark's default 
[[HadoopFsRelation]] is primarily due to the
+ * fact that it injects real partition's path as the value of the partition 
field, which Hudi ultimately persists
+ * as part of the record payload. In some cases, however, partition path might 
not necessarily be equal to the
+ * verbatim value of the partition path field (when custom [[KeyGenerator]] is 
used) therefore leading to incorrect
+ * partition field values being written
  */
-class BaseFileOnlyViewRelation(
-    sqlContext: SQLContext,
-    metaClient: HoodieTableMetaClient,
-    optParams: Map[String, String],
-    userSchema: Option[StructType],
-    globPaths: Seq[Path]
-  ) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) 
with SparkAdapterSupport {
-
-  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
-    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "false")
-
-    val filterExpressions = 
HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
-      .getOrElse(Literal(true, BooleanType))
-    val (partitionFilters, dataFilters) = {
-      val splited = filters.map { filter =>
-        HoodieDataSourceHelper.splitPartitionAndDataPredicates(
-          sparkSession, filterExpressions, partitionColumns)
-      }
-      (splited.flatMap(_._1), splited.flatMap(_._2))
-    }
-    val partitionFiles = getPartitionFiles(partitionFilters, dataFilters)
+class BaseFileOnlyViewRelation(sqlContext: SQLContext,
+                               metaClient: HoodieTableMetaClient,
+                               optParams: Map[String, String],
+                               userSchema: Option[StructType],
+                               globPaths: Seq[Path])
+  extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) 
with SparkAdapterSupport {
 
-    val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
-    val filePartitions = sparkAdapter.getFilePartitions(sparkSession, 
partitionFiles, maxSplitBytes)
+  private val fileIndex = HoodieFileIndex(sparkSession, metaClient, 
userSchema, optParams,
+    FileStatusCache.getOrCreate(sqlContext.sparkSession))
+
+  override def doBuildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[InternalRow] = {
+    // NOTE: In case list of requested columns doesn't contain the Primary Key 
one, we
+    //       have to add it explicitly so that
+    //          - Merging could be performed correctly
+    //          - In case 0 columns are to be fetched (for ex, when doing 
{@code count()} on Spark's [[Dataset]],
+    //          Spark still fetches all the rows to execute the query correctly
+    //
+    //       It's okay to return columns that have not been requested by the 
caller, as those nevertheless will be
+    //       filtered out upstream
+    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
+    val (requiredAvroSchema, requiredStructSchema) =
+      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
+
+    val filterExpressions = convertToExpressions(filters)
+    val (partitionFilters, dataFilters) = 
filterExpressions.partition(isPartitionPredicate)
+
+    val filePartitions = getPartitions(partitionFilters, dataFilters)
+
+    val partitionSchema = StructType(Nil)
+    val tableSchema = HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString)
+    val requiredSchema = HoodieTableSchema(requiredStructSchema, 
requiredAvroSchema.toString)
 
-    val requiredSchemaParquetReader = 
HoodieDataSourceHelper.buildHoodieParquetReader(
-      sparkSession = sparkSession,
-      dataSchema = tableStructSchema,
-      partitionSchema = StructType(Nil),
-      requiredSchema = tableStructSchema,
+    val baseFileReader = createBaseFileReader(
+      spark = sparkSession,
+      partitionSchema = partitionSchema,
+      tableSchema = tableSchema,
+      requiredSchema = requiredSchema,
       filters = filters,
       options = optParams,
-      hadoopConf = sparkSession.sessionState.newHadoopConf()
+      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+      //       to configure Parquet reader appropriately
+      hadoopConf = new Configuration(conf)
     )
 
-    new HoodieFileScanRDD(sparkSession, requiredColumns, tableStructSchema,
-      requiredSchemaParquetReader, filePartitions)
+    new HoodieFileScanRDD(sparkSession, baseFileReader, filePartitions)
   }
 
-  private def getPartitionFiles(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): Seq[PartitionedFile] = {
+  private def getPartitions(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[FilePartition] = {
     val partitionDirectories = if (globPaths.isEmpty) {
       val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, 
userSchema, optParams,
         FileStatusCache.getOrCreate(sqlContext.sparkSession))
@@ -89,18 +108,46 @@ class BaseFileOnlyViewRelation(
       inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
     }
 
-    val partitionFiles = partitionDirectories.flatMap { partition =>
+    val partitions = partitionDirectories.flatMap { partition =>
       partition.files.flatMap { file =>
+        // TODO move to adapter
+        // TODO fix, currently assuming parquet as underlying format
         HoodieDataSourceHelper.splitFiles(
           sparkSession = sparkSession,
           file = file,
-          partitionValues = partition.values
+          // TODO clarify why this is required
+          partitionValues = InternalRow.empty
         )
       }
     }
 
-    partitionFiles.map{ f =>
-      PartitionedFile(InternalRow.empty, f.filePath, f.start, f.length)
+    val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
+
+    sparkAdapter.getFilePartitions(sparkSession, partitions, maxSplitBytes)
+  }
+
+  private def convertToExpressions(filters: Array[Filter]): Array[Expression] 
= {
+    val catalystExpressions = 
HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
+
+    val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) 
=> opt.isEmpty }
+    if (failedExprs.nonEmpty) {
+      val failedFilters = failedExprs.map(p => filters(p._2))
+      logWarning(s"Failed to convert Filters into Catalyst expressions 
(${failedFilters.map(_.toString)})")
     }
+
+    catalystExpressions.filter(_.isDefined).map(_.get).toArray
   }
+
+  /**
+   * Checks whether given expression only references only references partition 
columns
+   * (and involves no sub-query)
+   */
+  private def isPartitionPredicate(condition: Expression): Boolean = {
+    // Validates that the provided names both resolve to the same entity
+    val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver
+
+    condition.references.forall { r => 
partitionColumns.exists(resolvedNameEquals(r.name, _)) } &&
+      !SubqueryExpression.hasSubquery(condition)
+  }
+
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 1e2946d..e07b316 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -22,38 +22,70 @@ import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hbase.io.hfile.CacheConfig
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.HoodieBaseRelation.isMetadataTable
 import org.apache.hudi.common.config.SerializableConfiguration
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieFileFormat
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.io.storage.HoodieHFileReader
-import org.apache.hudi.metadata.HoodieTableMetadata
+import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
 import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.avro.SchemaConverters
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.{Row, SQLContext, SparkSession}
 
 import scala.collection.JavaConverters._
 import scala.util.Try
 
 case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: 
String)
 
+case class HoodieTableState(recordKeyField: String,
+                            preCombineFieldOpt: Option[String])
+
 /**
  * Hoodie BaseRelation which extends [[PrunedFilteredScan]].
  */
-abstract class HoodieBaseRelation(
-    val sqlContext: SQLContext,
-    metaClient: HoodieTableMetaClient,
-    optParams: Map[String, String],
-    userSchema: Option[StructType])
-  extends BaseRelation with PrunedFilteredScan with Logging{
+abstract class HoodieBaseRelation(val sqlContext: SQLContext,
+                                  metaClient: HoodieTableMetaClient,
+                                  optParams: Map[String, String],
+                                  userSchema: Option[StructType])
+  extends BaseRelation with PrunedFilteredScan with Logging {
 
   protected val sparkSession: SparkSession = sqlContext.sparkSession
 
+  protected lazy val conf: Configuration = new 
Configuration(sqlContext.sparkContext.hadoopConfiguration)
+  protected lazy val jobConf = new JobConf(conf)
+
+  // If meta fields are enabled, always prefer key from the meta field as 
opposed to user-specified one
+  // NOTE: This is historical behavior which is preserved as is
+  protected lazy val recordKeyField: String =
+    if (metaClient.getTableConfig.populateMetaFields()) 
HoodieRecord.RECORD_KEY_METADATA_FIELD
+    else metaClient.getTableConfig.getRecordKeyFieldProp
+
+  protected lazy val preCombineFieldOpt: Option[String] = 
getPrecombineFieldProperty
+
+  /**
+   * @VisibleInTests
+   */
+  lazy val mandatoryColumns: Seq[String] = {
+    if (isMetadataTable(metaClient)) {
+      Seq(HoodieMetadataPayload.KEY_FIELD_NAME, 
HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE)
+    } else {
+      Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
+    }
+  }
+
+  protected lazy val specifiedQueryInstant: Option[String] =
+    optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
+      .map(HoodieSqlCommonUtils.formatQueryInstant)
+
   protected lazy val tableAvroSchema: Schema = {
     val schemaUtil = new TableSchemaResolver(metaClient)
     Try(schemaUtil.getTableAvroSchema).getOrElse(
@@ -81,6 +113,34 @@ abstract class HoodieBaseRelation(
     }
 
   override def schema: StructType = tableStructSchema
+
+  /**
+   * This method controls whether relation will be producing
+   * <ul>
+   *   <li>[[Row]], when it's being equal to true</li>
+   *   <li>[[InternalRow]], when it's being equal to false</li>
+   * </ul>
+   *
+   * Returning [[InternalRow]] directly enables us to save on needless ser/de 
loop from [[InternalRow]] (being
+   * produced by file-reader) to [[Row]] and back
+   */
+  override final def needConversion: Boolean = false
+
+  /**
+   * NOTE: DO NOT OVERRIDE THIS METHOD
+   */
+  override final def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+    // Here we rely on a type erasure, to workaround inherited API restriction 
and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
+    // Please check [[needConversion]] scala-doc for more details
+    doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]]
+  }
+
+  protected def doBuildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[InternalRow]
+
+  protected final def appendMandatoryColumns(requestedColumns: Array[String]): 
Array[String] = {
+    val missing = mandatoryColumns.filter(col => 
!requestedColumns.contains(col))
+    requestedColumns ++ missing
+  }
 }
 
 object HoodieBaseRelation {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
index fb12549..40299cf 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
@@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileStatus
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, 
SpecificInternalRow, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, 
SpecificInternalRow, UnsafeProjection}
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.sources.Filter
@@ -33,43 +33,6 @@ import scala.collection.JavaConverters._
 
 object HoodieDataSourceHelper extends PredicateHelper {
 
-  /**
-   * Partition the given condition into two sequence of conjunctive predicates:
-   * - predicates that can be evaluated using metadata only.
-   * - other predicates.
-   */
-  def splitPartitionAndDataPredicates(
-      spark: SparkSession,
-      condition: Expression,
-      partitionColumns: Seq[String]): (Seq[Expression], Seq[Expression]) = {
-    splitConjunctivePredicates(condition).partition(
-      isPredicateMetadataOnly(spark, _, partitionColumns))
-  }
-
-  /**
-   * Check if condition can be evaluated using only metadata. In Delta, this 
means the condition
-   * only references partition columns and involves no subquery.
-   */
-  def isPredicateMetadataOnly(
-      spark: SparkSession,
-      condition: Expression,
-      partitionColumns: Seq[String]): Boolean = {
-    isPredicatePartitionColumnsOnly(spark, condition, partitionColumns) &&
-        !SubqueryExpression.hasSubquery(condition)
-  }
-
-  /**
-   * Does the predicate only contains partition columns?
-   */
-  def isPredicatePartitionColumnsOnly(
-      spark: SparkSession,
-      condition: Expression,
-      partitionColumns: Seq[String]): Boolean = {
-    val nameEquality = spark.sessionState.analyzer.resolver
-    condition.references.forall { r =>
-      partitionColumns.exists(nameEquality(r.name, _))
-    }
-  }
 
   /**
    * Wrapper `buildReaderWithPartitionValues` of [[ParquetFileFormat]]
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
index 9f2d7d9..7e8f62b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
@@ -18,56 +18,37 @@
 
 package org.apache.hudi
 
-import org.apache.spark.{Partition, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile, SchemaColumnConvertNotSupportedException}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.{Partition, TaskContext}
 
 /**
- * Similar to [[org.apache.spark.sql.execution.datasources.FileScanRDD]].
- *
- * This class will extract the fields needed according to [[requiredColumns]] 
and
- * return iterator of [[org.apache.spark.sql.Row]] directly.
+ * TODO eval if we actually need it
  */
-class HoodieFileScanRDD(
-    @transient private val sparkSession: SparkSession,
-    requiredColumns: Array[String],
-    schema: StructType,
-    readFunction: PartitionedFile => Iterator[InternalRow],
-    @transient val filePartitions: Seq[FilePartition])
-  extends RDD[Row](sparkSession.sparkContext, Nil) {
-
-  private val requiredSchema = {
-    val nameToStructField = schema.map(field => (field.name, field)).toMap
-    StructType(requiredColumns.map(nameToStructField))
-  }
-
-  private val requiredFieldPos = 
HoodieSparkUtils.collectFieldIndexes(requiredSchema, schema)
-
-  override def compute(split: Partition, context: TaskContext): Iterator[Row] 
= {
-    val iterator = new Iterator[Object] with AutoCloseable {
+class HoodieFileScanRDD(@transient private val sparkSession: SparkSession,
+                        readFunction: PartitionedFile => Iterator[InternalRow],
+                        @transient fileSplits: Seq[FilePartition])
+  extends HoodieUnsafeRDD(sparkSession.sparkContext) {
 
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    val iterator = new Iterator[InternalRow] with AutoCloseable {
       private[this] val files = 
split.asInstanceOf[FilePartition].files.toIterator
-      private[this] var currentFile: PartitionedFile = null
-      private[this] var currentIterator: Iterator[Object] = null
+      private[this] var currentFile: PartitionedFile = _
+      private[this] var currentIterator: Iterator[InternalRow] = _
 
       override def hasNext: Boolean = {
         (currentIterator != null && currentIterator.hasNext) || nextIterator()
       }
 
-      def next(): Object = {
-        currentIterator.next()
-      }
+      def next(): InternalRow = currentIterator.next()
 
       /** Advances to the next file. Returns true if a new non-empty iterator 
is available. */
       private def nextIterator(): Boolean = {
         if (files.hasNext) {
-          currentFile = files.next()
-
           logInfo(s"Reading File $currentFile")
+          currentFile = files.next()
           currentIterator = readFunction(currentFile)
 
           try {
@@ -93,17 +74,8 @@ class HoodieFileScanRDD(
     // Register an on-task-completion callback to close the input stream.
     context.addTaskCompletionListener[Unit](_ => iterator.close())
 
-    // extract required columns from row
-    val iterAfterExtract = HoodieDataSourceHelper.extractRequiredSchema(
-      iterator.asInstanceOf[Iterator[InternalRow]],
-      requiredSchema,
-      requiredFieldPos)
-
-    // convert InternalRow to Row and return
-    val converter = 
CatalystTypeConverters.createToScalaConverter(requiredSchema)
-    iterAfterExtract.map(converter(_).asInstanceOf[Row])
+    iterator.asInstanceOf[Iterator[InternalRow]]
   }
 
-  override protected def getPartitions: Array[Partition] = 
filePartitions.toArray
-
+  override protected def getPartitions: Array[Partition] = fileSplits.toArray
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index 4a4ea2c..3a518da 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -36,7 +36,6 @@ import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
 import 
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadata}
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
 import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -54,10 +53,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
                            @transient config: Configuration,
                            fullSchemaFileReader: PartitionedFile => 
Iterator[InternalRow],
                            requiredSchemaFileReader: PartitionedFile => 
Iterator[InternalRow],
-                           tableState: HoodieMergeOnReadTableState,
+                           tableState: HoodieTableState,
                            tableSchema: HoodieTableSchema,
-                           requiredSchema: HoodieTableSchema)
-  extends RDD[InternalRow](sc, Nil) {
+                           requiredSchema: HoodieTableSchema,
+                           @transient fileSplits: 
List[HoodieMergeOnReadFileSplit])
+  extends HoodieUnsafeRDD(sc) {
 
   private val confBroadcast = sc.broadcast(new SerializableWritable(config))
   private val recordKeyField = tableState.recordKeyField
@@ -98,12 +98,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     iter
   }
 
-  override protected def getPartitions: Array[Partition] = {
-    tableState
-      .hoodieRealtimeFileSplits
-      .zipWithIndex
-      .map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray
-  }
+  override protected def getPartitions: Array[Partition] =
+    fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2, 
file._1)).toArray
 
   private def getConfig: Configuration = {
     val conf = confBroadcast.value.value
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala
new file mode 100644
index 0000000..3f95746
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+
+/**
+ * !!! PLEASE READ CAREFULLY !!!
+ *
+ * Base class for all of the custom low-overhead RDD implementations for Hudi.
+ *
+ * To keep memory allocation footprint as low as possible, each inheritor of 
this RDD base class
+ *
+ * <pre>
+ *   1. Does NOT deserialize from [[InternalRow]] to [[Row]] (therefore only 
providing access to
+ *   Catalyst internal representations (often mutable) of the read row)
+ *
+ *   2. DOES NOT COPY UNDERLYING ROW OUT OF THE BOX, meaning that
+ *
+ *      a) access to this RDD is NOT thread-safe
+ *
+ *      b) iterating over it reference to a _mutable_ underlying instance (of 
[[InternalRow]]) is
+ *      returned, entailing that after [[Iterator#next()]] is invoked on the 
provided iterator,
+ *      previous reference becomes **invalid**. Therefore, you will have to 
copy underlying mutable
+ *      instance of [[InternalRow]] if you plan to access it after 
[[Iterator#next()]] is invoked (filling
+ *      it with the next row's payload)
+ *
+ *      c) due to item b) above, no operation other than the iteration will 
produce meaningful
+ *      results on it and will likely fail [1]
+ * </pre>
+ *
+ * [1] For example, [[RDD#collect]] method on this implementation would not 
work correctly, as it's
+ * simply using Scala's default [[Iterator#toArray]] method which will simply 
concat all the references onto
+ * the same underlying mutable object into [[Array]]. Instead each individual 
[[InternalRow]] _has to be copied_,
+ * before concatenating into the final output. Please refer to 
[[HoodieRDDUtils#collect]] for more details.
+ *
+ * NOTE: It enforces, for ex, that all of the RDDs implement [[compute]] 
method returning
+ *       [[InternalRow]] to avoid superfluous ser/de
+ */
+abstract class HoodieUnsafeRDD(@transient sc: SparkContext)
+  extends RDD[InternalRow](sc, Nil) {
+
+  def compute(split: Partition, context: TaskContext): Iterator[InternalRow]
+
+  override final def collect(): Array[InternalRow] =
+    throw new UnsupportedOperationException(
+      "This method will not function correctly, please refer to scala-doc for 
HoodieUnsafeRDD"
+    )
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index b9d18c6..8308e3b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -19,7 +19,6 @@ package org.apache.hudi
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{GlobPattern, Path}
-import org.apache.hadoop.mapred.JobConf
 import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -28,11 +27,11 @@ import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata, 
getWritePartitionPaths, listAffectedFilesForCommits}
 import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SQLContext}
 
 import scala.collection.JavaConversions._
 
@@ -47,9 +46,6 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
                                      val metaClient: HoodieTableMetaClient)
   extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) {
 
-  private val conf = new 
Configuration(sqlContext.sparkContext.hadoopConfiguration)
-  private val jobConf = new JobConf(conf)
-
   private val commitTimeline = 
metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
   if (commitTimeline.empty()) {
     throw new HoodieException("No instants to incrementally pull")
@@ -77,8 +73,6 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
 
   private val fileIndex = if (commitsToReturn.isEmpty) List() else 
buildFileIndex()
 
-  private val preCombineFieldOpt = getPrecombineFieldProperty
-
   // Record filters making sure that only records w/in the requested bounds 
are being fetched as part of the
   // scan collected by this relation
   private lazy val incrementalSpanRecordsFilters: Seq[Filter] = {
@@ -88,18 +82,16 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
     Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
   }
 
-  private lazy val mandatoryColumns = {
+  override lazy val mandatoryColumns: Seq[String] = {
     // NOTE: This columns are required for Incremental flow to be able to 
handle the rows properly, even in
     //       cases when no columns are requested to be fetched (for ex, when 
using {@code count()} API)
     Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
       preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
   }
 
-  override def needConversion: Boolean = false
-
-  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+  override def doBuildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[InternalRow] = {
     if (fileIndex.isEmpty) {
-      sqlContext.sparkContext.emptyRDD[Row]
+      sqlContext.sparkContext.emptyRDD[InternalRow]
     } else {
       logDebug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}")
       logDebug(s"buildScan filters = ${filters.mkString(",")}")
@@ -148,20 +140,20 @@ class MergeOnReadIncrementalRelation(sqlContext: 
SQLContext,
         hadoopConf = new Configuration(conf)
       )
 
-      val hoodieTableState = HoodieMergeOnReadTableState(fileIndex, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt)
+      val hoodieTableState = 
HoodieTableState(HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt)
 
       // TODO implement incremental span record filtering w/in RDD to make 
sure returned iterator is appropriately
       //      filtered, since file-reader might not be capable to perform 
filtering
-      val rdd = new HoodieMergeOnReadRDD(
+      new HoodieMergeOnReadRDD(
         sqlContext.sparkContext,
         jobConf,
         fullSchemaParquetReader,
         requiredSchemaParquetReader,
         hoodieTableState,
         tableSchema,
-        requiredSchema
+        requiredSchema,
+        fileIndex
       )
-      rdd.asInstanceOf[RDD[Row]]
     }
   }
 
@@ -225,9 +217,4 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
         latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes, 
mergeType)
     })
   }
-
-  private def appendMandatoryColumns(requestedColumns: Array[String]): 
Array[String] = {
-    val missing = mandatoryColumns.filter(col => 
!requestedColumns.contains(col))
-    requestedColumns ++ missing
-  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index 7c1a354..6156054 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -20,22 +20,19 @@ package org.apache.hudi
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieBaseRelation.{createBaseFileReader, 
isMetadataTable}
+import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
 import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
 import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
-import org.apache.hudi.metadata.HoodieMetadataPayload
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
PartitionedFile}
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SQLContext}
 
 import scala.collection.JavaConverters._
 
@@ -46,10 +43,6 @@ case class HoodieMergeOnReadFileSplit(dataFile: 
Option[PartitionedFile],
                                       maxCompactionMemoryInBytes: Long,
                                       mergeType: String)
 
-case class HoodieMergeOnReadTableState(hoodieRealtimeFileSplits: 
List[HoodieMergeOnReadFileSplit],
-                                       recordKeyField: String,
-                                       preCombineFieldOpt: Option[String])
-
 class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
                                   optParams: Map[String, String],
                                   val userSchema: Option[StructType],
@@ -57,38 +50,13 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
                                   val metaClient: HoodieTableMetaClient)
   extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) {
 
-  private val conf = new 
Configuration(sqlContext.sparkContext.hadoopConfiguration)
-  private val jobConf = new JobConf(conf)
-
   private val mergeType = optParams.getOrElse(
     DataSourceReadOptions.REALTIME_MERGE.key,
     DataSourceReadOptions.REALTIME_MERGE.defaultValue)
 
   private val maxCompactionMemoryInBytes = 
getMaxCompactionMemoryInBytes(jobConf)
 
-  // If meta fields are enabled, always prefer key from the meta field as 
opposed to user-specified one
-  // NOTE: This is historical behavior which is preserved as is
-  private val recordKeyField = {
-    if (metaClient.getTableConfig.populateMetaFields()) 
HoodieRecord.RECORD_KEY_METADATA_FIELD
-    else metaClient.getTableConfig.getRecordKeyFieldProp
-  }
-
-  private val preCombineFieldOpt = getPrecombineFieldProperty
-
-  private lazy val mandatoryColumns = {
-    if (isMetadataTable(metaClient)) {
-      Seq(HoodieMetadataPayload.KEY_FIELD_NAME, 
HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE)
-    } else {
-      Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
-    }
-  }
-
-  override def needConversion: Boolean = false
-
-  private val specifiedQueryInstant = 
optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
-    .map(HoodieSqlCommonUtils.formatQueryInstant)
-
-  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+  override def doBuildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[InternalRow] = {
     log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
     log.debug(s" buildScan filters = ${filters.mkString(",")}")
 
@@ -137,12 +105,10 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
       hadoopConf = new Configuration(conf)
     )
 
-    val tableState = HoodieMergeOnReadTableState(fileIndex, recordKeyField, 
preCombineFieldOpt)
+    val tableState = HoodieTableState(recordKeyField, preCombineFieldOpt)
 
-    val rdd = new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, 
fullSchemaParquetReader,
-      requiredSchemaParquetReader, tableState, tableSchema, requiredSchema)
-
-    rdd.asInstanceOf[RDD[Row]]
+    new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, 
fullSchemaParquetReader,
+      requiredSchemaParquetReader, tableState, tableSchema, requiredSchema, 
fileIndex)
   }
 
   def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] 
= {
@@ -193,7 +159,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
       val partitionColumns = hoodieFileIndex.partitionSchema.fieldNames.toSet
       val partitionFilters = filters.filter(f => f.references.forall(p => 
partitionColumns.contains(p)))
       val partitionFilterExpression =
-        HoodieSparkUtils.convertToCatalystExpressions(partitionFilters, 
tableStructSchema)
+        HoodieSparkUtils.convertToCatalystExpression(partitionFilters, 
tableStructSchema)
       val convertedPartitionFilterExpression =
         HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, 
partitionFilterExpression.toSeq)
 
@@ -231,11 +197,6 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
       }
     }
   }
-
-  private def appendMandatoryColumns(requestedColumns: Array[String]): 
Array[String] = {
-    val missing = mandatoryColumns.filter(col => 
!requestedColumns.contains(col))
-    requestedColumns ++ missing
-  }
 }
 
 object MergeOnReadSnapshotRelation {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala
new file mode 100644
index 0000000..1ac8fa0
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.hudi.HoodieUnsafeRDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.util.MutablePair
+
+/**
+ * Suite of utilities helping in handling instances of [[HoodieUnsafeRDD]]
+ */
+object HoodieUnsafeRDDUtils {
+
+  /**
+   * Canonical implementation of the [[RDD#collect]] for [[HoodieUnsafeRDD]], 
returning a properly
+   * copied [[Array]] of [[InternalRow]]s
+   */
+  def collect(rdd: HoodieUnsafeRDD): Array[InternalRow] = {
+    rdd.mapPartitionsInternal { iter =>
+      // NOTE: We're leveraging [[MutablePair]] here to avoid unnecessary 
allocations, since
+      //       a) iteration is performed lazily and b) iteration is 
single-threaded (w/in partition)
+      val pair = new MutablePair[InternalRow, Null]()
+      iter.map(row => pair.update(row.copy(), null))
+    }
+      .map(p => p._1)
+      .collect()
+  }
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSerializer.scala
similarity index 89%
rename from 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala
rename to 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSerializer.scala
index 050efbd..4a3a7c4 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSerializer.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.avro
 import org.apache.avro.Schema
 import org.apache.spark.sql.types.DataType
 
-class HoodieAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, 
nullable: Boolean)
-  extends HoodieAvroSerializerTrait {
+class HoodieSparkAvroSerializer(rootCatalystType: DataType, rootAvroType: 
Schema, nullable: Boolean)
+  extends HoodieAvroSerializer {
 
   val avroSerializer = new AvroSerializer(rootCatalystType, rootAvroType, 
nullable)
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala
index 9b1b88d..8aa47ff 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala
@@ -17,11 +17,9 @@
 
 package org.apache.hudi
 
-import org.apache.hudi.HoodieSparkUtils.convertToCatalystExpressions
 import org.apache.hudi.HoodieSparkUtils.convertToCatalystExpression
-
-import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, 
GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, 
LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith}
-import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField, StructType}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Test
 
@@ -93,7 +91,7 @@ class TestConvertFilterToCatalystExpression {
     } else {
       expectExpression
     }
-    val exp = convertToCatalystExpressions(filters, tableSchema)
+    val exp = convertToCatalystExpression(filters, tableSchema)
     if (removeQuotesIfNeed == null) {
       assertEquals(exp.isEmpty, true)
     } else {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 315a14c..18b639f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -26,6 +26,7 @@ import 
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
+import org.apache.log4j.LogManager
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{col, lit}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
@@ -39,6 +40,8 @@ import scala.collection.JavaConversions._
 @Tag("functional")
 class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
 
+  private val log = LogManager.getLogger(classOf[TestMORDataSourceStorage])
+
   val commonOpts = Map(
     "hoodie.insert.shuffle.parallelism" -> "4",
     "hoodie.upsert.shuffle.parallelism" -> "4",
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
new file mode 100644
index 0000000..a963081
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.avro.Schema
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.testutils.{HadoopMapRedUtils, 
HoodieTestDataGenerator}
+import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
+import org.apache.spark.HoodieUnsafeRDDUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, Row, SaveMode}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Test}
+
+import scala.:+
+import scala.collection.JavaConverters._
+
+@Tag("functional")
+class TestParquetColumnProjection extends SparkClientFunctionalTestHarness 
with Logging {
+
+  val defaultWriteOpts = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+    "hoodie.delete.shuffle.parallelism" -> "1",
+    DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+    DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+    HoodieMetadataConfig.ENABLE.key -> "true",
+    // NOTE: It's critical that we use non-partitioned table, since the way we 
track amount of bytes read
+    //       is not robust, and works most reliably only when we read just a 
single file. As such, making table
+    //       non-partitioned makes it much more likely just a single file will 
be written
+    DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
classOf[NonpartitionedKeyGenerator].getName
+  )
+
+  @Test
+  def testBaseFileOnlyViewRelation(): Unit = {
+    val tablePath = s"$basePath/cow"
+    val targetRecordsCount = 100
+    val (_, schema) = bootstrapTable(tablePath, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, targetRecordsCount,
+      defaultWriteOpts, populateMetaFields = true)
+    val tableState = TableState(tablePath, schema, targetRecordsCount, 0.0)
+
+    // Stats for the reads fetching only _projected_ columns (note how amount 
of bytes read
+    // increases along w/ the # of columns)
+    val projectedColumnsReadStats: Array[(String, Long)] =
+      if (HoodieSparkUtils.isSpark3)
+        Array(
+          ("rider", 2452),
+          ("rider,driver", 2552),
+          ("rider,driver,tip_history", 3517))
+      else if (HoodieSparkUtils.isSpark2)
+        Array(
+          ("rider", 2595),
+          ("rider,driver", 2735),
+          ("rider,driver,tip_history", 3750))
+      else
+        fail("Only Spark 3 and Spark 2 are currently supported")
+
+    // Test COW / Snapshot
+    runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, "", 
projectedColumnsReadStats)
+  }
+
+  @Test
+  def testMergeOnReadSnapshotRelationWithDeltaLogs(): Unit = {
+    val tablePath = s"$basePath/mor-with-logs"
+    val targetRecordsCount = 100
+    val targetUpdatedRecordsRatio = 0.5
+
+    val (_, schema) = bootstrapMORTable(tablePath, targetRecordsCount, 
targetUpdatedRecordsRatio, defaultWriteOpts, populateMetaFields = true)
+    val tableState = TableState(tablePath, schema, targetRecordsCount, 
targetUpdatedRecordsRatio)
+
+    // Stats for the reads fetching only _projected_ columns (note how amount 
of bytes read
+    // increases along w/ the # of columns)
+    val projectedColumnsReadStats: Array[(String, Long)] =
+      if (HoodieSparkUtils.isSpark3)
+        Array(
+          ("rider", 2452),
+          ("rider,driver", 2552),
+          ("rider,driver,tip_history", 3517))
+      else if (HoodieSparkUtils.isSpark2)
+        Array(
+          ("rider", 2595),
+          ("rider,driver", 2735),
+          ("rider,driver,tip_history", 3750))
+      else
+        fail("Only Spark 3 and Spark 2 are currently supported")
+
+    // Stats for the reads fetching _all_ columns (note, how amount of bytes 
read
+    // is invariant of the # of columns)
+    val fullColumnsReadStats: Array[(String, Long)] =
+      if (HoodieSparkUtils.isSpark3)
+        Array(
+          ("rider", 14665),
+          ("rider,driver", 14665),
+          ("rider,driver,tip_history", 14665))
+      else if (HoodieSparkUtils.isSpark2)
+        // TODO re-enable tests (these tests are very unstable currently)
+        Array(
+          ("rider", -1),
+          ("rider,driver", -1),
+          ("rider,driver,tip_history", -1))
+      else
+        fail("Only Spark 3 and Spark 2 are currently supported")
+
+    // Test MOR / Snapshot / Skip-merge
+    runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, 
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats)
+
+    // Test MOR / Snapshot / Payload-combine
+    runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, 
DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, fullColumnsReadStats)
+
+    // Test MOR / Read Optimized
+    runTest(tableState, 
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", 
projectedColumnsReadStats)
+  }
+
+  @Test
+  def testMergeOnReadSnapshotRelationWithNoDeltaLogs(): Unit = {
+    val tablePath = s"$basePath/mor-no-logs"
+    val targetRecordsCount = 100
+    val targetUpdatedRecordsRatio = 0.0
+
+    val (_, schema) = bootstrapMORTable(tablePath, targetRecordsCount, 
targetUpdatedRecordsRatio, defaultWriteOpts, populateMetaFields = true)
+    val tableState = TableState(tablePath, schema, targetRecordsCount, 
targetUpdatedRecordsRatio)
+
+    //
+    // Test #1: MOR table w/ Delta Logs
+    //
+
+    // Stats for the reads fetching only _projected_ columns (note how amount 
of bytes read
+    // increases along w/ the # of columns)
+    val projectedColumnsReadStats: Array[(String, Long)] =
+      if (HoodieSparkUtils.isSpark3)
+        Array(
+          ("rider", 2452),
+          ("rider,driver", 2552),
+          ("rider,driver,tip_history", 3517))
+      else if (HoodieSparkUtils.isSpark2)
+        Array(
+          ("rider", 2595),
+          ("rider,driver", 2735),
+          ("rider,driver,tip_history", 3750))
+      else
+        fail("Only Spark 3 and Spark 2 are currently supported")
+
+    // Test MOR / Snapshot / Skip-merge
+    runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, 
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats)
+
+    // Test MOR / Snapshot / Payload-combine
+    runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, 
DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, 
projectedColumnsReadStats)
+
+    // Test MOR / Read Optimized
+    runTest(tableState, 
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", 
projectedColumnsReadStats)
+  }
+
+  // TODO add test for incremental query of the table with logs
+  @Test
+  def testMergeOnReadIncrementalRelationWithNoDeltaLogs(): Unit = {
+    val tablePath = s"$basePath/mor-no-logs"
+    val targetRecordsCount = 100
+    val targetUpdatedRecordsRatio = 0.0
+
+    val opts: Map[String, String] =
+      // NOTE: Parquet Compression is disabled as it was leading to 
non-deterministic outcomes when testing
+      //       against Spark 2.x
+      defaultWriteOpts ++ 
Seq(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key -> "")
+
+    val (_, schema) = bootstrapMORTable(tablePath, targetRecordsCount, 
targetUpdatedRecordsRatio, opts, populateMetaFields = true)
+    val tableState = TableState(tablePath, schema, targetRecordsCount, 
targetUpdatedRecordsRatio)
+
+    // Stats for the reads fetching only _projected_ columns (note how amount 
of bytes read
+    // increases along w/ the # of columns)
+    val projectedColumnsReadStats: Array[(String, Long)] =
+      if (HoodieSparkUtils.isSpark3)
+        Array(
+          ("rider", 4219),
+          ("rider,driver", 4279),
+          ("rider,driver,tip_history", 5186))
+      else if (HoodieSparkUtils.isSpark2)
+        Array(
+          ("rider", 4430),
+          ("rider,driver", 4530),
+          ("rider,driver,tip_history", 5487))
+      else
+        fail("Only Spark 3 and Spark 2 are currently supported")
+
+    // Stats for the reads fetching _all_ columns (note, how amount of bytes 
read
+    // is invariant of the # of columns)
+    val fullColumnsReadStats: Array[(String, Long)] =
+      if (HoodieSparkUtils.isSpark3)
+        Array(
+          ("rider", 19683),
+          ("rider,driver", 19683),
+          ("rider,driver,tip_history", 19683))
+      else if (HoodieSparkUtils.isSpark2)
+        // TODO re-enable tests (these tests are very unstable currently)
+        Array(
+          ("rider", -1),
+          ("rider,driver", -1),
+          ("rider,driver,tip_history", -1))
+      else
+        fail("Only Spark 3 and Spark 2 are currently supported")
+
+    val incrementalOpts: Map[String, String] = Map(
+      DataSourceReadOptions.BEGIN_INSTANTTIME.key -> "001"
+    )
+
+    // Test MOR / Incremental / Skip-merge
+    runTest(tableState, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, 
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL,
+      projectedColumnsReadStats, incrementalOpts)
+
+    // Test MOR / Incremental / Payload-combine
+    runTest(tableState, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, 
DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL,
+      fullColumnsReadStats, incrementalOpts)
+  }
+
+
+  // Test routine
+  private def runTest(tableState: TableState,
+                      queryType: String,
+                      mergeType: String,
+                      expectedStats: Array[(String, Long)],
+                      additionalOpts: Map[String, String] = Map.empty): Unit = 
{
+    val tablePath = tableState.path
+    val readOpts = defaultWriteOpts ++ Map(
+      "path" -> tablePath,
+      DataSourceReadOptions.QUERY_TYPE.key -> queryType,
+      DataSourceReadOptions.REALTIME_MERGE.key -> mergeType
+    ) ++ additionalOpts
+
+    val ds = new DefaultSource()
+    val relation: HoodieBaseRelation = ds.createRelation(spark.sqlContext, 
readOpts).asInstanceOf[HoodieBaseRelation]
+
+    for ((columnListStr, expectedBytesRead) <- expectedStats) {
+      val targetColumns = columnListStr.split(",")
+
+      println(s"Running test for $tablePath / $queryType / $mergeType / 
$columnListStr")
+
+      val (rows, bytesRead) = measureBytesRead { () =>
+        val rdd = relation.buildScan(targetColumns, 
Array.empty).asInstanceOf[HoodieUnsafeRDD]
+        HoodieUnsafeRDDUtils.collect(rdd)
+      }
+
+      val targetRecordCount = tableState.targetRecordCount;
+      val targetUpdatedRecordsRatio = tableState.targetUpdatedRecordsRatio
+
+      val expectedRecordCount =
+        if 
(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL.equals(mergeType)) 
targetRecordCount * (1 + targetUpdatedRecordsRatio)
+        else targetRecordCount
+
+      assertEquals(expectedRecordCount, rows.length)
+      if (expectedBytesRead != -1) {
+        assertEquals(expectedBytesRead, bytesRead)
+      } else {
+        logWarning(s"Not matching bytes read ($bytesRead)")
+      }
+
+      val readColumns = targetColumns ++ relation.mandatoryColumns
+      val (_, projectedStructType) = 
HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns)
+
+      val row: InternalRow = rows.take(1).head
+
+      // This check is mostly about making sure InternalRow deserializes 
properly into projected schema
+      val deserializedColumns = row.toSeq(projectedStructType)
+      assertEquals(readColumns.length, deserializedColumns.size)
+    }
+  }
+
+  private def bootstrapTable(path: String,
+                             tableType: String,
+                             recordCount: Int,
+                             opts: Map[String, String],
+                             populateMetaFields: Boolean,
+                             dataGenOpt: Option[HoodieTestDataGenerator] = 
None): (List[HoodieRecord[_]], Schema) = {
+    val dataGen = dataGenOpt.getOrElse(new HoodieTestDataGenerator(0x12345))
+
+    // Bulk Insert Operation
+    val schema =
+      if (populateMetaFields) 
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS
+      else HoodieTestDataGenerator.AVRO_SCHEMA
+
+    val records = dataGen.generateInserts("001", recordCount)
+    val inputDF: Dataset[Row] = toDataset(records, 
HoodieTestDataGenerator.AVRO_SCHEMA)
+
+    inputDF.write.format("org.apache.hudi")
+      .options(opts)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(path)
+
+    (records.asScala.toList, schema)
+  }
+
+  private def bootstrapMORTable(path: String,
+                                recordCount: Int,
+                                updatedRecordsRatio: Double,
+                                opts: Map[String, String],
+                                populateMetaFields: Boolean,
+                                dataGenOpt: Option[HoodieTestDataGenerator] = 
None): (List[HoodieRecord[_]], Schema) = {
+    val dataGen = dataGenOpt.getOrElse(new HoodieTestDataGenerator(0x12345))
+
+    // Step 1: Bootstrap table w/ N records (t/h bulk-insert)
+    val (insertedRecords, schema) = bootstrapTable(path, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, recordCount, opts, 
populateMetaFields, Some(dataGen))
+
+    if (updatedRecordsRatio == 0) {
+      (insertedRecords, schema)
+    } else {
+      val updatesCount = (insertedRecords.length * updatedRecordsRatio).toInt
+      val recordsToUpdate = insertedRecords.take(updatesCount)
+      val updatedRecords = dataGen.generateUpdates("002", 
recordsToUpdate.asJava)
+
+      // Step 2: Update M records out of those (t/h update)
+      val inputDF = toDataset(updatedRecords, 
HoodieTestDataGenerator.AVRO_SCHEMA)
+
+      inputDF.write.format("org.apache.hudi")
+        .options(opts)
+        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
+        .mode(SaveMode.Append)
+        .save(path)
+
+      (updatedRecords.asScala.toList ++ insertedRecords.drop(updatesCount), 
schema)
+    }
+  }
+
+  def measureBytesRead[T](f: () => T): (T, Int) = {
+    // Init BenchmarkCounter to report number of bytes actually read from the 
Block
+    
BenchmarkCounter.initCounterFromReporter(HadoopMapRedUtils.createTestReporter, 
fs.getConf)
+    val r = f.apply()
+    val bytesRead = BenchmarkCounter.getBytesRead.toInt
+    (r, bytesRead)
+  }
+
+  case class TableState(path: String, schema: Schema, targetRecordCount: Long, 
targetUpdatedRecordsRatio: Double)
+}
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index f7fa233..54c8b91 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.adapter
 import org.apache.avro.Schema
 import org.apache.hudi.Spark2RowSerDe
 import org.apache.hudi.client.utils.SparkRowSerDe
-import org.apache.spark.sql.avro.{HoodieAvroDeserializerTrait, 
HoodieAvroSerializerTrait, Spark2HoodieAvroDeserializer, HoodieAvroSerializer}
+import org.apache.spark.sql.avro.{HoodieAvroDeserializer, 
HoodieAvroSerializer, HoodieSpark2AvroDeserializer, HoodieSparkAvroSerializer}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
@@ -42,11 +42,11 @@ import scala.collection.mutable.ArrayBuffer
  */
 class Spark2Adapter extends SparkAdapter {
 
-  def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, 
nullable: Boolean): HoodieAvroSerializerTrait =
-    new HoodieAvroSerializer(rootCatalystType, rootAvroType, nullable)
+  def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, 
nullable: Boolean): HoodieAvroSerializer =
+    new HoodieSparkAvroSerializer(rootCatalystType, rootAvroType, nullable)
 
-  def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType): HoodieAvroDeserializerTrait =
-    new Spark2HoodieAvroDeserializer(rootAvroType, rootCatalystType)
+  def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType): HoodieAvroDeserializer =
+    new HoodieSpark2AvroDeserializer(rootAvroType, rootCatalystType)
 
   override def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): 
SparkRowSerDe = {
     new Spark2RowSerDe(encoder)
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/Spark2HoodieAvroDeserializer.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/HoodieSpark2AvroDeserializer.scala
similarity index 76%
rename from 
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/Spark2HoodieAvroDeserializer.scala
rename to 
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/HoodieSpark2AvroDeserializer.scala
index ac2c82f..2b55c66 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/Spark2HoodieAvroDeserializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/HoodieSpark2AvroDeserializer.scala
@@ -21,13 +21,15 @@ import org.apache.avro.Schema
 import org.apache.spark.sql.types.DataType
 
 /**
- * This is Spark 2 implementation for the [[HoodieAvroDeserializerTrait]] 
leveraging [[PatchedAvroDeserializer]],
+ * This is Spark 2 implementation for the [[HoodieAvroDeserializer]] 
leveraging [[PatchedAvroDeserializer]],
  * which is just copied over version of [[AvroDeserializer]] from Spark 2.4.4 
w/ SPARK-30267 being back-ported to it
  */
-class Spark2HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType)
-  extends HoodieAvroDeserializerTrait {
+class HoodieSpark2AvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType)
+  extends HoodieAvroDeserializer {
 
   private val avroDeserializer = new PatchedAvroDeserializer(rootAvroType, 
rootCatalystType)
 
-  def doDeserialize(data: Any): Any = avroDeserializer.deserialize(data)
+  // As of Spark 3.1, this will return data wrapped with Option, so we make 
sure these interfaces
+  // are aligned across Spark versions
+  def deserialize(data: Any): Option[Any] = 
Some(avroDeserializer.deserialize(data))
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala
index d0328fb..ad33832 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala
@@ -21,7 +21,7 @@ import org.apache.avro.Schema
 import org.apache.hudi.Spark3RowSerDe
 import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.spark3.internal.ReflectUtil
-import org.apache.spark.sql.avro.{HoodieAvroDeserializerTrait, 
HoodieAvroSerializerTrait, Spark3HoodieAvroDeserializer, HoodieAvroSerializer}
+import org.apache.spark.sql.avro.{HoodieAvroDeserializer, 
HoodieAvroSerializer, HoodieSpark3AvroDeserializer, HoodieSparkAvroSerializer}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
@@ -43,11 +43,11 @@ import org.apache.spark.sql.{Row, SparkSession}
  */
 class Spark3Adapter extends SparkAdapter {
 
-  def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, 
nullable: Boolean): HoodieAvroSerializerTrait =
-    new HoodieAvroSerializer(rootCatalystType, rootAvroType, nullable)
+  def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, 
nullable: Boolean): HoodieAvroSerializer =
+    new HoodieSparkAvroSerializer(rootCatalystType, rootAvroType, nullable)
 
-  def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType): HoodieAvroDeserializerTrait =
-    new Spark3HoodieAvroDeserializer(rootAvroType, rootCatalystType)
+  def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType): HoodieAvroDeserializer =
+    new HoodieSpark3AvroDeserializer(rootAvroType, rootCatalystType)
 
   override def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): 
SparkRowSerDe = {
     new Spark3RowSerDe(encoder)
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/Spark3HoodieAvroDeserializer.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3AvroDeserializer.scala
similarity index 89%
rename from 
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/Spark3HoodieAvroDeserializer.scala
rename to 
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3AvroDeserializer.scala
index fa03f5d..bd9ead5 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/Spark3HoodieAvroDeserializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3AvroDeserializer.scala
@@ -21,8 +21,8 @@ import org.apache.avro.Schema
 import org.apache.hudi.HoodieSparkUtils
 import org.apache.spark.sql.types.DataType
 
-class Spark3HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType)
-  extends HoodieAvroDeserializerTrait {
+class HoodieSpark3AvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType)
+  extends HoodieAvroDeserializer {
 
   // SPARK-34404: As of Spark3.2, there is no AvroDeserializer's constructor 
with Schema and DataType arguments.
   // So use the reflection to get AvroDeserializer instance.
@@ -34,5 +34,5 @@ class Spark3HoodieAvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataT
     constructor.newInstance(rootAvroType, rootCatalystType)
   }
 
-  def doDeserialize(data: Any): Any = avroDeserializer.deserialize(data)
+  def deserialize(data: Any): Option[Any] = avroDeserializer.deserialize(data)
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
index a57be62..9a62c14 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java
@@ -249,14 +249,17 @@ public class TestHDFSParquetImporter extends 
FunctionalTestHarness implements Se
     long startTime = 
HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime() / 
1000;
     List<GenericRecord> records = new ArrayList<GenericRecord>();
     // 10 for update
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
     for (long recordNum = 0; recordNum < 11; recordNum++) {
-      records.add(new 
HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0", 
"rider-upsert-" + recordNum,
-          "driver-upsert" + recordNum, startTime + 
TimeUnit.HOURS.toSeconds(recordNum)));
+      records.add(
+          dataGen.generateGenericRecord(Long.toString(recordNum), "0", 
"rider-upsert-" + recordNum,
+              "driver-upsert" + recordNum, startTime + 
TimeUnit.HOURS.toSeconds(recordNum)));
     }
     // 4 for insert
     for (long recordNum = 96; recordNum < 100; recordNum++) {
-      records.add(new 
HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0", 
"rider-upsert-" + recordNum,
-          "driver-upsert" + recordNum, startTime + 
TimeUnit.HOURS.toSeconds(recordNum)));
+      records.add(
+          dataGen.generateGenericRecord(Long.toString(recordNum), "0", 
"rider-upsert-" + recordNum,
+              "driver-upsert" + recordNum, startTime + 
TimeUnit.HOURS.toSeconds(recordNum)));
     }
     try (ParquetWriter<GenericRecord> writer = 
AvroParquetWriter.<GenericRecord>builder(srcFile)
         
.withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build())
 {

Reply via email to