vinothchandar commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r457864770



##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
##########
@@ -69,6 +71,17 @@ public static Schema readSchema(Configuration conf, Path 
filePath) {
     }
   }
 
+  /**
+   * get the max compaction memory in bytes from JobConf.
+   */
+  public static long getMaxCompactionMemoryInBytes(JobConf jobConf) {

Review comment:
       ah ok. its just relocated 

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
##########
@@ -147,12 +146,4 @@ public Schema getWriterSchema() {
   public Schema getHiveSchema() {
     return hiveSchema;
   }
-
-  public long getMaxCompactionMemoryInBytes() {

Review comment:
       why remove this? 

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##########
@@ -40,6 +40,8 @@
 import java.io.IOException;
 import java.util.Map;
 
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes;

Review comment:
       seems good to avoid the static import here and have the reader realize 
the class its coming from ? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition

Review comment:
       lets name the classes `HoodieMergeOn...` not `Hudi..` to be consistent 
with rest of the code. 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+                                    logPaths: Option[List[String]],
+                                    latestCommit: String,
+                                    tablePath: String,
+                                    maxCompactionMemoryInBytes: Long,
+                                    skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+                        val optParams: Map[String, String],
+                        val userSchema: StructType,
+                        val globPaths: Seq[Path],
+                        val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{

Review comment:
       nit: space after Logging? `Logging {` ? 

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HadoopSerializableConfiguration.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.config;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class HadoopSerializableConfiguration implements Serializable {

Review comment:
       hows this different from the class we already have in `hudi-common` : 
`SerializableConfiguration` ? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -110,6 +112,10 @@ object DataSourceReadOptions {
    */
   val INCR_PATH_GLOB_OPT_KEY = "hoodie.datasource.read.incr.path.glob"
   val DEFAULT_INCR_PATH_GLOB_OPT_VAL = ""
+
+
+  val REALTIME_SKIP_MERGE_KEY = REALTIME_SKIP_MERGE_PROP

Review comment:
       should we just make a new datasource level config for this. and 
translate. mixing raw InputFormat level configs here, feels a bit problematic 
in terms of long term maitenance> 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+                                    logPaths: Option[List[String]],
+                                    latestCommit: String,
+                                    tablePath: String,
+                                    maxCompactionMemoryInBytes: Long,
+                                    skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+                        val optParams: Map[String, String],
+                        val userSchema: StructType,
+                        val globPaths: Seq[Path],
+                        val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{
+
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+    AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  private val skipMerge = optParams.getOrElse(
+    DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+    DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+  private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new 
JobConf(conf))
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = latestSchema
+
+  override def needConversion: Boolean = false
+
+  override def buildScan(): RDD[Row] = {
+    val parquetReaderFunction = new 
ParquetFileFormat().buildReaderWithPartitionValues(

Review comment:
       from the last PR, it seems the biggest change is calling 
`ParquetFileFormat()`. and the wrapping as opposed to inheriting the code. 
Would life be lot simpler if we still defined our own FileFormat and then 
wrapped `ParquetFileFormat` and the code you have to read logs as 
`Iterator<InternalRow>` ? Just a thought.  
https://github.com/apache/spark/blob/8c7d6f9733751503f80d5a1b2463904dfefd6843/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala#L105
   
   What you are doing is probably valid and makes it consistent with how 
@umehrot2 is also approaching it. if they are equivalent, then I am fine with 
it. 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -123,4 +120,25 @@ class DefaultSource extends RelationProvider
   }
 
   override def shortName(): String = "hudi"
+
+  private def getReadOptimizedView(sqlContext: SQLContext,

Review comment:
       rename: getBaseFileOnlyView() 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: 
List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+           config: Configuration,
+           dataReadFunction: PartitionedFile => Iterator[Any],
+           dataSchema: StructType,
+           hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+    this(
+      sc,
+      sc.broadcast(new HadoopSerializableConfiguration(config))
+      .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+      dataReadFunction,
+      dataSchema,
+      hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, dataReadFunction)
+      case unMergeSplit if unMergeSplit.skipMerge =>
+        unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case mergeSplit if !mergeSplit.skipMerge =>
+        mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case _ => throw new HoodieException("Unable to select an Iterator to 
read the Hudi MOR File Split")
+    }
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    hudiRealtimeFileSplits.zipWithIndex.map(file => 
HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+    broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+                   readFileFunction: PartitionedFile => Iterator[Any]): 
Iterator[InternalRow] = {
+    val fileIterator = readFileFunction(partitionedFile)
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala
+    })
+    rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                  readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var hudiLogRecordsIterator = 
hudiLogRecords.keySet().iterator().asScala
+
+      override def hasNext: Boolean = {
+        dataFileIterator.hasNext || hudiLogRecordsIterator.hasNext
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          dataFileIterator.next()
+        } else {
+          val curAvrokey = hudiLogRecordsIterator.next()
+          val curAvroRecord = 
hudiLogRecords.get(curAvrokey).getData.getInsertValue(logSchema).get()
+          converter.deserialize(curAvroRecord).asInstanceOf[InternalRow]
+        }
+      }
+    }
+
+  private def mergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var parquetFinished = false
+      private var logRecordToRead = hudiLogRecords.keySet()
+      private var hudiLogRecordsIterator: Iterator[String] = _
+
+      override def hasNext: Boolean = {
+        if (dataFileIterator.hasNext) {
+          true
+        } else {
+          if (!parquetFinished) {
+            parquetFinished = true
+            hudiLogRecordsIterator = logRecordToRead.iterator().asScala
+          }
+          hudiLogRecordsIterator.hasNext
+        }
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          val curRow = dataFileIterator.next()
+          val curKey = curRow.getString(HOODIE_RECORD_KEY_COL_POS)
+          if (hudiLogRecords.keySet().contains(curKey)) {
+            logRecordToRead.remove(curKey)

Review comment:
       is the remove needed. this map is often spillable.. we should just make 
sure the remove does not incur additional I/O or soemethng 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: 
List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+           config: Configuration,
+           dataReadFunction: PartitionedFile => Iterator[Any],
+           dataSchema: StructType,
+           hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+    this(
+      sc,
+      sc.broadcast(new HadoopSerializableConfiguration(config))
+      .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+      dataReadFunction,
+      dataSchema,
+      hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, dataReadFunction)
+      case unMergeSplit if unMergeSplit.skipMerge =>
+        unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case mergeSplit if !mergeSplit.skipMerge =>
+        mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case _ => throw new HoodieException("Unable to select an Iterator to 
read the Hudi MOR File Split")
+    }
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    hudiRealtimeFileSplits.zipWithIndex.map(file => 
HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+    broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+                   readFileFunction: PartitionedFile => Iterator[Any]): 
Iterator[InternalRow] = {
+    val fileIterator = readFileFunction(partitionedFile)
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala
+    })
+    rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                  readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var hudiLogRecordsIterator = 
hudiLogRecords.keySet().iterator().asScala
+
+      override def hasNext: Boolean = {
+        dataFileIterator.hasNext || hudiLogRecordsIterator.hasNext
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          dataFileIterator.next()
+        } else {
+          val curAvrokey = hudiLogRecordsIterator.next()
+          val curAvroRecord = 
hudiLogRecords.get(curAvrokey).getData.getInsertValue(logSchema).get()
+          converter.deserialize(curAvroRecord).asInstanceOf[InternalRow]
+        }
+      }
+    }
+
+  private def mergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var parquetFinished = false
+      private var logRecordToRead = hudiLogRecords.keySet()
+      private var hudiLogRecordsIterator: Iterator[String] = _
+
+      override def hasNext: Boolean = {
+        if (dataFileIterator.hasNext) {
+          true
+        } else {
+          if (!parquetFinished) {

Review comment:
       lets not leak parquet in the naming within this class. we can keep it 
generic as base vs log files

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: 
List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+           config: Configuration,
+           dataReadFunction: PartitionedFile => Iterator[Any],
+           dataSchema: StructType,
+           hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+    this(
+      sc,
+      sc.broadcast(new HadoopSerializableConfiguration(config))
+      .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+      dataReadFunction,
+      dataSchema,
+      hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, dataReadFunction)
+      case unMergeSplit if unMergeSplit.skipMerge =>
+        unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case mergeSplit if !mergeSplit.skipMerge =>
+        mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case _ => throw new HoodieException("Unable to select an Iterator to 
read the Hudi MOR File Split")
+    }
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    hudiRealtimeFileSplits.zipWithIndex.map(file => 
HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+    broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+                   readFileFunction: PartitionedFile => Iterator[Any]): 
Iterator[InternalRow] = {
+    val fileIterator = readFileFunction(partitionedFile)
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala
+    })
+    rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                  readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var hudiLogRecordsIterator = 
hudiLogRecords.keySet().iterator().asScala
+
+      override def hasNext: Boolean = {
+        dataFileIterator.hasNext || hudiLogRecordsIterator.hasNext
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          dataFileIterator.next()
+        } else {
+          val curAvrokey = hudiLogRecordsIterator.next()
+          val curAvroRecord = 
hudiLogRecords.get(curAvrokey).getData.getInsertValue(logSchema).get()
+          converter.deserialize(curAvroRecord).asInstanceOf[InternalRow]
+        }
+      }
+    }
+
+  private def mergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var parquetFinished = false
+      private var logRecordToRead = hudiLogRecords.keySet()
+      private var hudiLogRecordsIterator: Iterator[String] = _
+
+      override def hasNext: Boolean = {
+        if (dataFileIterator.hasNext) {
+          true
+        } else {
+          if (!parquetFinished) {
+            parquetFinished = true
+            hudiLogRecordsIterator = logRecordToRead.iterator().asScala
+          }
+          hudiLogRecordsIterator.hasNext
+        }
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          val curRow = dataFileIterator.next()
+          val curKey = curRow.getString(HOODIE_RECORD_KEY_COL_POS)
+          if (hudiLogRecords.keySet().contains(curKey)) {
+            logRecordToRead.remove(curKey)
+            mergeRowWithLog(curRow)
+          } else {
+            curRow
+          }
+        } else {
+          val curKey = hudiLogRecordsIterator.next()

Review comment:
       same comment here about double checking if this is actually ok 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+                                    logPaths: Option[List[String]],
+                                    latestCommit: String,
+                                    tablePath: String,
+                                    maxCompactionMemoryInBytes: Long,
+                                    skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+                        val optParams: Map[String, String],
+                        val userSchema: StructType,
+                        val globPaths: Seq[Path],
+                        val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{
+
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+    AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  private val skipMerge = optParams.getOrElse(
+    DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+    DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+  private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new 
JobConf(conf))
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = latestSchema
+
+  override def needConversion: Boolean = false
+
+  override def buildScan(): RDD[Row] = {
+    val parquetReaderFunction = new 
ParquetFileFormat().buildReaderWithPartitionValues(
+      sparkSession = sqlContext.sparkSession,
+      dataSchema = latestSchema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = latestSchema,

Review comment:
       would n't this read all the fields out? should we not pass `userSchema` 
here? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+                                    logPaths: Option[List[String]],
+                                    latestCommit: String,
+                                    tablePath: String,
+                                    maxCompactionMemoryInBytes: Long,
+                                    skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+                        val optParams: Map[String, String],
+                        val userSchema: StructType,
+                        val globPaths: Seq[Path],
+                        val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{

Review comment:
       can we file a JIRA for this. and is it possible to target this for 0.6.0 
as well? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: 
List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+           config: Configuration,
+           dataReadFunction: PartitionedFile => Iterator[Any],
+           dataSchema: StructType,
+           hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+    this(
+      sc,
+      sc.broadcast(new HadoopSerializableConfiguration(config))
+      .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+      dataReadFunction,
+      dataSchema,
+      hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, dataReadFunction)
+      case unMergeSplit if unMergeSplit.skipMerge =>
+        unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case mergeSplit if !mergeSplit.skipMerge =>
+        mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case _ => throw new HoodieException("Unable to select an Iterator to 
read the Hudi MOR File Split")
+    }
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    hudiRealtimeFileSplits.zipWithIndex.map(file => 
HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+    broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+                   readFileFunction: PartitionedFile => Iterator[Any]): 
Iterator[InternalRow] = {
+    val fileIterator = readFileFunction(partitionedFile)
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala
+    })
+    rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                  readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var hudiLogRecordsIterator = 
hudiLogRecords.keySet().iterator().asScala
+
+      override def hasNext: Boolean = {
+        dataFileIterator.hasNext || hudiLogRecordsIterator.hasNext
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          dataFileIterator.next()
+        } else {
+          val curAvrokey = hudiLogRecordsIterator.next()
+          val curAvroRecord = 
hudiLogRecords.get(curAvrokey).getData.getInsertValue(logSchema).get()
+          converter.deserialize(curAvroRecord).asInstanceOf[InternalRow]
+        }
+      }
+    }
+
+  private def mergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var parquetFinished = false
+      private var logRecordToRead = hudiLogRecords.keySet()
+      private var hudiLogRecordsIterator: Iterator[String] = _
+
+      override def hasNext: Boolean = {
+        if (dataFileIterator.hasNext) {
+          true
+        } else {
+          if (!parquetFinished) {
+            parquetFinished = true
+            hudiLogRecordsIterator = logRecordToRead.iterator().asScala
+          }
+          hudiLogRecordsIterator.hasNext
+        }
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          val curRow = dataFileIterator.next()
+          val curKey = curRow.getString(HOODIE_RECORD_KEY_COL_POS)
+          if (hudiLogRecords.keySet().contains(curKey)) {

Review comment:
       why not `containsKey`. `keySet()` may create a copy? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: 
List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+           config: Configuration,
+           dataReadFunction: PartitionedFile => Iterator[Any],
+           dataSchema: StructType,
+           hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+    this(
+      sc,
+      sc.broadcast(new HadoopSerializableConfiguration(config))
+      .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+      dataReadFunction,
+      dataSchema,
+      hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, dataReadFunction)
+      case unMergeSplit if unMergeSplit.skipMerge =>
+        unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case mergeSplit if !mergeSplit.skipMerge =>
+        mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case _ => throw new HoodieException("Unable to select an Iterator to 
read the Hudi MOR File Split")
+    }
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    hudiRealtimeFileSplits.zipWithIndex.map(file => 
HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+    broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+                   readFileFunction: PartitionedFile => Iterator[Any]): 
Iterator[InternalRow] = {
+    val fileIterator = readFileFunction(partitionedFile)
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala
+    })
+    rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                  readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var hudiLogRecordsIterator = 
hudiLogRecords.keySet().iterator().asScala
+
+      override def hasNext: Boolean = {
+        dataFileIterator.hasNext || hudiLogRecordsIterator.hasNext
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          dataFileIterator.next()
+        } else {
+          val curAvrokey = hudiLogRecordsIterator.next()
+          val curAvroRecord = 
hudiLogRecords.get(curAvrokey).getData.getInsertValue(logSchema).get()
+          converter.deserialize(curAvroRecord).asInstanceOf[InternalRow]
+        }
+      }
+    }
+
+  private def mergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var parquetFinished = false
+      private var logRecordToRead = hudiLogRecords.keySet()
+      private var hudiLogRecordsIterator: Iterator[String] = _
+
+      override def hasNext: Boolean = {
+        if (dataFileIterator.hasNext) {
+          true
+        } else {
+          if (!parquetFinished) {
+            parquetFinished = true
+            hudiLogRecordsIterator = logRecordToRead.iterator().asScala
+          }
+          hudiLogRecordsIterator.hasNext

Review comment:
       this seems to indicate that we will keep scanning the remaining entries 
in the log and hand them out if `dataFileIterator` runs out. We need to be 
careful about how it interplays with split generation. specifically, only works 
if the each base file has only 1 split.. 
   ```
   HudiMergeOnReadFileSplit(partitionedFile, logPaths, latestCommit,
            metaClient.getBasePath, maxCompactionMemoryInBytes, skipMerge)
   ```
   
   here `partitionedFile` has to be a single file and not an input Split. 
otherwise we will face an issue that the log entry belongs to a different split 
and the ultimate query will have duplicates. 
   

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],

Review comment:
       rename `baseFileReadFunction` 

##########
File path: 
hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -67,7 +68,7 @@ class TestDataSource {
     // Insert Operation
     val records = 
DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 
100)).toList
     val inputDF: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records, 2))
-    inputDF.write.format("hudi")
+    inputDF.write.format("org.apache.hudi")

Review comment:
       why change this? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+                                    logPaths: Option[List[String]],
+                                    latestCommit: String,
+                                    tablePath: String,
+                                    maxCompactionMemoryInBytes: Long,
+                                    skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+                        val optParams: Map[String, String],
+                        val userSchema: StructType,
+                        val globPaths: Seq[Path],
+                        val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{
+
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+    AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  private val skipMerge = optParams.getOrElse(
+    DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+    DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+  private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new 
JobConf(conf))
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = latestSchema
+
+  override def needConversion: Boolean = false
+
+  override def buildScan(): RDD[Row] = {
+    val parquetReaderFunction = new 
ParquetFileFormat().buildReaderWithPartitionValues(
+      sparkSession = sqlContext.sparkSession,
+      dataSchema = latestSchema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = latestSchema,
+      filters = Seq.empty,

Review comment:
       if we did `PrunedFilteredScan` we can also pass in teh filters? in any 
case, can we pass in the the options? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: 
List[HudiMergeOnReadFileSplit])

Review comment:
       hudi vs hoodie 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+                                    logPaths: Option[List[String]],
+                                    latestCommit: String,
+                                    tablePath: String,
+                                    maxCompactionMemoryInBytes: Long,
+                                    skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+                        val optParams: Map[String, String],
+                        val userSchema: StructType,
+                        val globPaths: Seq[Path],
+                        val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{
+
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+    AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  private val skipMerge = optParams.getOrElse(
+    DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+    DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+  private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new 
JobConf(conf))
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = latestSchema
+
+  override def needConversion: Boolean = false
+
+  override def buildScan(): RDD[Row] = {
+    val parquetReaderFunction = new 
ParquetFileFormat().buildReaderWithPartitionValues(
+      sparkSession = sqlContext.sparkSession,
+      dataSchema = latestSchema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = latestSchema,
+      filters = Seq.empty,
+      options = Map.empty,
+      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+    )
+    val rdd = new HudiMergeOnReadRDD(sqlContext.sparkContext,
+      sqlContext.sparkSession.sessionState.newHadoopConf(),
+      parquetReaderFunction, latestSchema, fileIndex)
+    rdd.asInstanceOf[RDD[Row]]
+  }
+
+  def buildFileIndex(): List[HudiMergeOnReadFileSplit] = {
+    val inMemoryFileIndex = 
HudiSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
+    val fileStatuses = inMemoryFileIndex.allFiles()
+    if (fileStatuses.isEmpty) {
+      throw new HoodieException("No files found for reading in user provided 
path.")
+    }
+
+    val fsView = new HoodieTableFileSystemView(metaClient,
+      metaClient.getActiveTimeline.getCommitsTimeline
+        .filterCompletedInstants, fileStatuses.toArray)
+    val latestFiles: List[HoodieBaseFile] = 
fsView.getLatestBaseFiles.iterator().asScala.toList
+    val latestCommit = fsView.getLastInstant.get().getTimestamp
+    val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
latestFiles.asJava).asScala
+    val FileSplits = fileGroup.map(kv => {

Review comment:
       rename variable  using camel case? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: 
List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+           config: Configuration,
+           dataReadFunction: PartitionedFile => Iterator[Any],
+           dataSchema: StructType,
+           hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+    this(
+      sc,
+      sc.broadcast(new HadoopSerializableConfiguration(config))
+      .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+      dataReadFunction,
+      dataSchema,
+      hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, dataReadFunction)
+      case unMergeSplit if unMergeSplit.skipMerge =>
+        unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case mergeSplit if !mergeSplit.skipMerge =>
+        mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case _ => throw new HoodieException("Unable to select an Iterator to 
read the Hudi MOR File Split")

Review comment:
       print the `mergeParquetPartition` split variable in the error message? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: 
List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+           config: Configuration,
+           dataReadFunction: PartitionedFile => Iterator[Any],
+           dataSchema: StructType,
+           hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+    this(
+      sc,
+      sc.broadcast(new HadoopSerializableConfiguration(config))
+      .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+      dataReadFunction,
+      dataSchema,
+      hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, dataReadFunction)
+      case unMergeSplit if unMergeSplit.skipMerge =>
+        unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case mergeSplit if !mergeSplit.skipMerge =>
+        mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case _ => throw new HoodieException("Unable to select an Iterator to 
read the Hudi MOR File Split")
+    }
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    hudiRealtimeFileSplits.zipWithIndex.map(file => 
HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+    broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+                   readFileFunction: PartitionedFile => Iterator[Any]): 
Iterator[InternalRow] = {
+    val fileIterator = readFileFunction(partitionedFile)
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala
+    })
+    rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                  readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var hudiLogRecordsIterator = 
hudiLogRecords.keySet().iterator().asScala
+
+      override def hasNext: Boolean = {
+        dataFileIterator.hasNext || hudiLogRecordsIterator.hasNext
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          dataFileIterator.next()
+        } else {
+          val curAvrokey = hudiLogRecordsIterator.next()
+          val curAvroRecord = 
hudiLogRecords.get(curAvrokey).getData.getInsertValue(logSchema).get()
+          converter.deserialize(curAvroRecord).asInstanceOf[InternalRow]
+        }
+      }
+    }
+
+  private def mergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var parquetFinished = false
+      private var logRecordToRead = hudiLogRecords.keySet()
+      private var hudiLogRecordsIterator: Iterator[String] = _
+
+      override def hasNext: Boolean = {
+        if (dataFileIterator.hasNext) {
+          true
+        } else {
+          if (!parquetFinished) {
+            parquetFinished = true
+            hudiLogRecordsIterator = logRecordToRead.iterator().asScala
+          }
+          hudiLogRecordsIterator.hasNext
+        }
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          val curRow = dataFileIterator.next()
+          val curKey = curRow.getString(HOODIE_RECORD_KEY_COL_POS)
+          if (hudiLogRecords.keySet().contains(curKey)) {
+            logRecordToRead.remove(curKey)
+            mergeRowWithLog(curRow)
+          } else {
+            curRow
+          }
+        } else {
+          val curKey = hudiLogRecordsIterator.next()
+          getAvroRecord(curKey)
+        }
+      }
+
+      private def getAvroRecord(curKey: String): InternalRow = {
+        val curAvroRecord = 
hudiLogRecords.get(curKey).getData.getInsertValue(logSchema).get()

Review comment:
       this implicitly assumes `OvewriteWithLatestPayload` ? can we just 
convert the parquet row as well to Avro and then perform the merge actually 
calling the right API. `HoodieRecordPayload#combineAndGetUpdateValue()` ?  This 
is a correctness issue we need to resolve in the PR.. 
   ideally, adding a test case as well to go along with this would be good 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to