vinothchandar commented on a change in pull request #1722:
URL: https://github.com/apache/hudi/pull/1722#discussion_r444220031



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -57,8 +57,7 @@ class DefaultSource extends RelationProvider
     if (path.isEmpty) {
       throw new HoodieException("'path' must be specified.")
     }
-
-    if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
+        if 
(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
       // this is just effectively RO view only, where `path` can contain a mix 
of

Review comment:
       Nit:indentation 
   
   This changes behavior for cow/snapshot. I feel we should have two separate 
methods - `createCopyOnWriteRelation` (No change in behavior) and 
`createMergeOnReadRelation` (changes for this poor)
   

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -65,7 +66,7 @@ object DataSourceReadOptions {
     * This eases migration from old configs to new configs.
     */
   def translateViewTypesToQueryTypes(optParams: Map[String, String]) : 
Map[String, String] = {
-    val translation = Map(VIEW_TYPE_READ_OPTIMIZED_OPT_VAL -> 
QUERY_TYPE_SNAPSHOT_OPT_VAL,
+    val translation = Map(VIEW_TYPE_READ_OPTIMIZED_OPT_VAL -> 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL,

Review comment:
       Don’t think this change is necessary, right? RO view does map to 
snapshot query for cow. We may need to have two maps for cow and mor

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat, 
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class SnapshotRelation(val sqlContext: SQLContext,
+                       val basePath: String,
+                       val optParams: Map[String, String],
+                       val userSchema: StructType) extends BaseRelation with 
TableScan {
+
+  private val log = LogManager.getLogger(classOf[SnapshotRelation])
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // Set config for listStatus() in HoodieParquetInputFormat
+  // TODO(garyli): Switch to bootstrap file listing methods
+  conf.setClass(
+    "mapreduce.input.pathFilter.class",
+    classOf[HoodieROTablePathFilter],
+    classOf[org.apache.hadoop.fs.PathFilter])
+  conf.setStrings("mapreduce.input.fileinputformat.inputdir", basePath)
+  conf.setStrings("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
+
+  private val HoodieInputFormat = new HoodieParquetInputFormat
+  HoodieInputFormat.setConf(conf)
+  private val fileStatus = HoodieInputFormat.listStatus(new JobConf(conf))
+  private val fileGroup = 
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
util.Arrays.stream(fileStatus)).asScala
+
+  // Split the file group to: parquet file without a matching log file, 
parquet file need to merge with log files
+  private val parquetWithoutLogPaths: List[String] = fileGroup.filter(p => 
p._2.size() == 0).keys.toList
+  private val fileWithLogMap: Map[String, String] = fileGroup.filter(p => 
p._2.size() > 0).map{ case(k, v) => (k, v.asScala.toList.mkString(","))}.toMap

Review comment:
       Consistent naming wrt l69

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat, 
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class SnapshotRelation(val sqlContext: SQLContext,
+                       val basePath: String,
+                       val optParams: Map[String, String],
+                       val userSchema: StructType) extends BaseRelation with 
TableScan {
+
+  private val log = LogManager.getLogger(classOf[SnapshotRelation])
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // Set config for listStatus() in HoodieParquetInputFormat
+  // TODO(garyli): Switch to bootstrap file listing methods
+  conf.setClass(
+    "mapreduce.input.pathFilter.class",
+    classOf[HoodieROTablePathFilter],
+    classOf[org.apache.hadoop.fs.PathFilter])
+  conf.setStrings("mapreduce.input.fileinputformat.inputdir", basePath)
+  conf.setStrings("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
+
+  private val HoodieInputFormat = new HoodieParquetInputFormat
+  HoodieInputFormat.setConf(conf)
+  private val fileStatus = HoodieInputFormat.listStatus(new JobConf(conf))
+  private val fileGroup = 
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
util.Arrays.stream(fileStatus)).asScala
+
+  // Split the file group to: parquet file without a matching log file, 
parquet file need to merge with log files
+  private val parquetWithoutLogPaths: List[String] = fileGroup.filter(p => 
p._2.size() == 0).keys.toList
+  private val fileWithLogMap: Map[String, String] = fileGroup.filter(p => 
p._2.size() > 0).map{ case(k, v) => (k, v.asScala.toList.mkString(","))}.toMap
+
+  if (log.isDebugEnabled) {
+    log.debug("All parquet files" + fileStatus.map(s => 
s.getPath.toString).mkString(","))
+    log.debug("ParquetWithoutLogPaths" + parquetWithoutLogPaths.mkString(","))

Review comment:
       Can we have variable names and comments more generically talking about 
base and log files

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -48,7 +49,7 @@ object DataSourceReadOptions {
   val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"
   val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"
   val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental"
-  val DEFAULT_QUERY_TYPE_OPT_VAL: String = QUERY_TYPE_SNAPSHOT_OPT_VAL
+  val DEFAULT_QUERY_TYPE_OPT_VAL: String = QUERY_TYPE_READ_OPTIMIZED_OPT_VAL

Review comment:
       For cow, snapshot is the default anyway.
   Why would globing like that error out? 

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetRecordReaderIterator.scala
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+import org.apache.parquet.hadoop.ParquetRecordReader
+
+import org.apache.avro.Schema
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.types.StructType
+import java.io.Closeable
+import java.util
+
+import scala.util.Try
+
+/**
+ * This class is the iterator for Hudi MOR table.
+ * Log files are scanned on initialization.
+ * This iterator will read the parquet file first and skip the record if it 
present in the log file.
+ * Then read the log file.
+ * Custom payload is not supported yet. This combining logic is matching with 
[OverwriteWithLatestAvroPayload]
+ * @param rowReader HoodieRealtimeParquetRecordReader
+ */
+class HoodieParquetRecordReaderIterator(private[this] var rowReader: 
ParquetRecordReader[UnsafeRow],
+                                        private[this] val split: 
HoodieRealtimeFileSplit,
+                                        private[this] val jobConf: JobConf) 
extends Iterator[UnsafeRow] with Closeable with Logging {
+  private[this] var havePair = false
+  private[this] var finished = false
+  private[this] var parquetFinished = false
+
+  private[this] var deltaRecordMap: 
util.Map[String,org.apache.hudi.common.model
+  .HoodieRecord[_ <: org.apache.hudi.common.model.HoodieRecordPayload[_ <: 
org.apache.hudi.common.model.HoodieRecordPayload[_ <: AnyRef]]]] = _

Review comment:
       why the fully qualified class names ?

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieRealtimeInputFormat.scala
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{FileSplit, JobConf}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, 
ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+import scala.collection.JavaConverters._
+
+/**
+ * This class is an extension of ParquetFileFormat from Spark SQL.
+ * The file split, record reader, record reader iterator are customized to 
read Hudi MOR table.
+ */
+class HoodieRealtimeInputFormat extends ParquetFileFormat {

Review comment:
       @garyli1019 true.. but if the logs are small w.r.t parquet base, this 
might be okay actually.. For workloads with high write amplification, people 
tend to have much smaller file sizes anyway.. We should support the row group 
level parallel splitting for sure, otherwise, performance would be an issue.. 
btw Hive achieves this today.. We wrap each file split into a realtime file 
split.. 

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieRealtimeInputFormat.scala
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{FileSplit, JobConf}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, 
ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+import scala.collection.JavaConverters._
+
+/**
+ * This class is an extension of ParquetFileFormat from Spark SQL.
+ * The file split, record reader, record reader iterator are customized to 
read Hudi MOR table.
+ */
+class HoodieRealtimeInputFormat extends ParquetFileFormat {
+  //TODO: Better usage of this short name.
+  override def shortName(): String = "hudi.snapshot"

Review comment:
       let's get rid of this for now?  we don't really plan to expose this to 
the users right?

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieRealtimeInputFormat.scala
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{FileSplit, JobConf}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, 
ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+import scala.collection.JavaConverters._
+
+/**
+ * This class is an extension of ParquetFileFormat from Spark SQL.
+ * The file split, record reader, record reader iterator are customized to 
read Hudi MOR table.
+ */
+class HoodieRealtimeInputFormat extends ParquetFileFormat {
+  //TODO: Better usage of this short name.
+  override def shortName(): String = "hudi.snapshot"
+  override def toString(): String = "hudi.snapshot"
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter], options: 
Map[String, String],
+                                              hadoopConf: Configuration): 
(PartitionedFile) =>
+    Iterator[InternalRow] = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+    hadoopConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, 
requiredSchema.json)
+    hadoopConf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json)
+    hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key,
+      sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+    ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sparkSession.sessionState.conf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
+
+    // TODO: if you move this into the closure it reverts to the default 
values.
+    // If true, enable using the custom RecordReader for parquet. This only 
works for
+    // a subset of the types (no complex types).
+    //val resultSchema = StructType(partitionSchema.fields ++ 
requiredSchema.fields)
+    val sqlConf = sparkSession.sessionState.conf
+    val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+    val timestampConversion: Boolean = 
sqlConf.isParquetINT96TimestampConversion
+    val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+    // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+    //val returningBatch = supportBatch(sparkSession, resultSchema)
+    val pushDownDate = sqlConf.parquetFilterPushDownDate
+    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+    val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
+    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+
+    (file: PartitionedFile) => {
+      assert(file.partitionValues.numFields == partitionSchema.size)
+
+      val sharedConf = broadcastedHadoopConf.value.value
+      val fileSplit =
+        new FileSplit(new Path(new URI(file.filePath)), file.start, 
file.length, new Array[String](0))

Review comment:
       this is a single parquet file?

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat, 
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.

Review comment:
       Actually this is what we use for both cow and mor.. see comment above. 
Would be best to keep behavior for cow same

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat, 
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class SnapshotRelation(val sqlContext: SQLContext,
+                       val basePath: String,
+                       val optParams: Map[String, String],
+                       val userSchema: StructType) extends BaseRelation with 
TableScan {
+
+  private val log = LogManager.getLogger(classOf[SnapshotRelation])
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // Set config for listStatus() in HoodieParquetInputFormat
+  // TODO(garyli): Switch to bootstrap file listing methods

Review comment:
       Assuming this pr will land first. Let’s remove todo and file a follow on 
jira ?

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat, 
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class SnapshotRelation(val sqlContext: SQLContext,
+                       val basePath: String,
+                       val optParams: Map[String, String],
+                       val userSchema: StructType) extends BaseRelation with 
TableScan {
+
+  private val log = LogManager.getLogger(classOf[SnapshotRelation])
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // Set config for listStatus() in HoodieParquetInputFormat
+  // TODO(garyli): Switch to bootstrap file listing methods
+  conf.setClass(
+    "mapreduce.input.pathFilter.class",
+    classOf[HoodieROTablePathFilter],
+    classOf[org.apache.hadoop.fs.PathFilter])
+  conf.setStrings("mapreduce.input.fileinputformat.inputdir", basePath)
+  conf.setStrings("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
+
+  private val HoodieInputFormat = new HoodieParquetInputFormat
+  HoodieInputFormat.setConf(conf)
+  private val fileStatus = HoodieInputFormat.listStatus(new JobConf(conf))
+  private val fileGroup = 
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
util.Arrays.stream(fileStatus)).asScala

Review comment:
       Rename:fileGroups

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat, 
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class SnapshotRelation(val sqlContext: SQLContext,
+                       val basePath: String,
+                       val optParams: Map[String, String],
+                       val userSchema: StructType) extends BaseRelation with 
TableScan {
+
+  private val log = LogManager.getLogger(classOf[SnapshotRelation])
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // Set config for listStatus() in HoodieParquetInputFormat
+  // TODO(garyli): Switch to bootstrap file listing methods
+  conf.setClass(
+    "mapreduce.input.pathFilter.class",
+    classOf[HoodieROTablePathFilter],
+    classOf[org.apache.hadoop.fs.PathFilter])
+  conf.setStrings("mapreduce.input.fileinputformat.inputdir", basePath)
+  conf.setStrings("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
+
+  private val HoodieInputFormat = new HoodieParquetInputFormat

Review comment:
       CamelCase Naming ?

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat, 
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class SnapshotRelation(val sqlContext: SQLContext,
+                       val basePath: String,
+                       val optParams: Map[String, String],
+                       val userSchema: StructType) extends BaseRelation with 
TableScan {
+
+  private val log = LogManager.getLogger(classOf[SnapshotRelation])
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // Set config for listStatus() in HoodieParquetInputFormat
+  // TODO(garyli): Switch to bootstrap file listing methods
+  conf.setClass(
+    "mapreduce.input.pathFilter.class",
+    classOf[HoodieROTablePathFilter],
+    classOf[org.apache.hadoop.fs.PathFilter])
+  conf.setStrings("mapreduce.input.fileinputformat.inputdir", basePath)
+  conf.setStrings("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
+
+  private val HoodieInputFormat = new HoodieParquetInputFormat
+  HoodieInputFormat.setConf(conf)
+  private val fileStatus = HoodieInputFormat.listStatus(new JobConf(conf))
+  private val fileGroup = 
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
util.Arrays.stream(fileStatus)).asScala
+
+  // Split the file group to: parquet file without a matching log file, 
parquet file need to merge with log files
+  private val parquetWithoutLogPaths: List[String] = fileGroup.filter(p => 
p._2.size() == 0).keys.toList
+  private val fileWithLogMap: Map[String, String] = fileGroup.filter(p => 
p._2.size() > 0).map{ case(k, v) => (k, v.asScala.toList.mkString(","))}.toMap
+
+  if (log.isDebugEnabled) {
+    log.debug("All parquet files" + fileStatus.map(s => 
s.getPath.toString).mkString(","))
+    log.debug("ParquetWithoutLogPaths" + parquetWithoutLogPaths.mkString(","))
+    log.debug("ParquetWithLogPaths" + fileWithLogMap.map(m => 
s"${m._1}:${m._2}").mkString(","))
+  }
+
+  // Add log file map to options
+  private val finalOps = optParams ++ fileWithLogMap
+
+  // Load Hudi metadata
+  val metaClient = new HoodieTableMetaClient(conf, basePath, true)
+  private val hoodieTable = HoodieTable.create(metaClient, 
HoodieWriteConfig.newBuilder().withPath(basePath).build(), conf)
+
+  private val commitTimeline = 
hoodieTable.getMetaClient.getCommitsAndCompactionTimeline
+  if (commitTimeline.empty()) {

Review comment:
       Should these checks happen before we list above? Also should we return 
an empty data set opposed to erroring others are no commits

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat, 
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class SnapshotRelation(val sqlContext: SQLContext,
+                       val basePath: String,
+                       val optParams: Map[String, String],
+                       val userSchema: StructType) extends BaseRelation with 
TableScan {
+
+  private val log = LogManager.getLogger(classOf[SnapshotRelation])
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // Set config for listStatus() in HoodieParquetInputFormat
+  conf.setClass(
+    "mapreduce.input.pathFilter.class",
+    classOf[HoodieROTablePathFilter],
+    classOf[org.apache.hadoop.fs.PathFilter])
+  conf.setStrings("mapreduce.input.fileinputformat.inputdir", basePath)
+  conf.setStrings("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
+
+  private val HoodieInputFormat = new HoodieParquetInputFormat
+  HoodieInputFormat.setConf(conf)
+  private val fileStatus = HoodieInputFormat.listStatus(new JobConf(conf))
+  private val fileGroup = 
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
util.Arrays.stream(fileStatus)).asScala
+
+  // Split the file group to: parquet file without a matching log file, 
parquet file need to merge with log files
+  private val parquetWithoutLogPaths: List[String] = fileGroup.filter(p => 
p._2.size() == 0).keys.toList
+  private val fileWithLogMap: Map[String, String] = fileGroup.filter(p => 
p._2.size() > 0).map{ case(k, v) => (k, v.asScala.toList.mkString(","))}.toMap
+
+  if (log.isDebugEnabled) {
+    log.debug("All parquet files" + fileStatus.map(s => 
s.getPath.toString).mkString(","))
+    log.debug("ParquetWithoutLogPaths" + parquetWithoutLogPaths.mkString(","))
+    log.debug("ParquetWithLogPaths" + fileWithLogMap.map(m => 
s"${m._1}:${m._2}").mkString(","))
+  }
+
+  // Add log file map to options
+  private val finalOps = optParams ++ fileWithLogMap

Review comment:
       We should refrain from listing other executors. @umehrot2  do you have 
any suggestions here? 
   FWIW the amount of data would be proportional to the amount of partitions 
globbed which should not be as bad?

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat, 
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class SnapshotRelation(val sqlContext: SQLContext,
+                       val basePath: String,
+                       val optParams: Map[String, String],
+                       val userSchema: StructType) extends BaseRelation with 
TableScan {
+
+  private val log = LogManager.getLogger(classOf[SnapshotRelation])
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // Set config for listStatus() in HoodieParquetInputFormat
+  // TODO(garyli): Switch to bootstrap file listing methods
+  conf.setClass(
+    "mapreduce.input.pathFilter.class",
+    classOf[HoodieROTablePathFilter],
+    classOf[org.apache.hadoop.fs.PathFilter])
+  conf.setStrings("mapreduce.input.fileinputformat.inputdir", basePath)
+  conf.setStrings("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
+
+  private val HoodieInputFormat = new HoodieParquetInputFormat
+  HoodieInputFormat.setConf(conf)
+  private val fileStatus = HoodieInputFormat.listStatus(new JobConf(conf))
+  private val fileGroup = 
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
util.Arrays.stream(fileStatus)).asScala
+
+  // Split the file group to: parquet file without a matching log file, 
parquet file need to merge with log files
+  private val parquetWithoutLogPaths: List[String] = fileGroup.filter(p => 
p._2.size() == 0).keys.toList
+  private val fileWithLogMap: Map[String, String] = fileGroup.filter(p => 
p._2.size() > 0).map{ case(k, v) => (k, v.asScala.toList.mkString(","))}.toMap
+
+  if (log.isDebugEnabled) {
+    log.debug("All parquet files" + fileStatus.map(s => 
s.getPath.toString).mkString(","))
+    log.debug("ParquetWithoutLogPaths" + parquetWithoutLogPaths.mkString(","))
+    log.debug("ParquetWithLogPaths" + fileWithLogMap.map(m => 
s"${m._1}:${m._2}").mkString(","))
+  }
+
+  // Add log file map to options
+  private val finalOps = optParams ++ fileWithLogMap
+
+  // Load Hudi metadata
+  val metaClient = new HoodieTableMetaClient(conf, basePath, true)
+  private val hoodieTable = HoodieTable.create(metaClient, 
HoodieWriteConfig.newBuilder().withPath(basePath).build(), conf)
+
+  private val commitTimeline = 
hoodieTable.getMetaClient.getCommitsAndCompactionTimeline
+  if (commitTimeline.empty()) {
+    throw new HoodieException("No Valid Hudi timeline exists")
+  }
+  private val completedCommitTimeline = 
hoodieTable.getMetaClient.getCommitsTimeline.filterCompletedInstants()
+  private val lastInstant = completedCommitTimeline.lastInstant().get()
+  conf.setStrings("hoodie.realtime.last.commit", lastInstant.getTimestamp)
+
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields);
+    AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  override def schema: StructType = latestSchema
+
+  override def buildScan(): RDD[Row] = {
+    if (fileWithLogMap.isEmpty) {
+      sqlContext
+        .read
+        .options(finalOps)
+        .schema(schema)
+        .format("parquet")
+        .load(parquetWithoutLogPaths:_*)
+        .toDF()
+        .rdd

Review comment:
       Need to understand this better.. do you see toDF().rdd causing 
additional overhead, compared to regular spark.read.parquet()

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieRealtimeInputFormat.scala
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{FileSplit, JobConf}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, 
ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+import scala.collection.JavaConverters._
+
+/**
+ * This class is an extension of ParquetFileFormat from Spark SQL.
+ * The file split, record reader, record reader iterator are customized to 
read Hudi MOR table.
+ */
+class HoodieRealtimeInputFormat extends ParquetFileFormat {

Review comment:
       Rename: HoodieParquetRealtimeFileFormat denoting this is specifically 
handling parquet base files

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetRecordReaderIterator.scala
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.avro.Schema
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+import org.apache.hudi.realtime.HoodieRealtimeParquetRecordReader
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.types.StructType
+import java.io.Closeable
+import java.util
+
+/**
+ * This class is the iterator for Hudi MOR table.
+ * This iterator will read the parquet file first and skip the record if it 
present in the log file.
+ * Then read the log file.
+ * Custom payload is not supported yet. This combining logic is matching with 
[OverwriteWithLatestAvroPayload]

Review comment:
       sg. We can also add new methods to Payload interface to avoid the 
conversion.. and merges rows directly. Let’s file a follow on

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetRecordReaderIterator.scala
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+import org.apache.parquet.hadoop.ParquetRecordReader
+
+import org.apache.avro.Schema
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.types.StructType
+import java.io.Closeable
+import java.util
+
+import scala.util.Try
+
+/**
+ * This class is the iterator for Hudi MOR table.
+ * Log files are scanned on initialization.
+ * This iterator will read the parquet file first and skip the record if it 
present in the log file.
+ * Then read the log file.
+ * Custom payload is not supported yet. This combining logic is matching with 
[OverwriteWithLatestAvroPayload]
+ * @param rowReader HoodieRealtimeParquetRecordReader
+ */
+class HoodieParquetRecordReaderIterator(private[this] var rowReader: 
ParquetRecordReader[UnsafeRow],
+                                        private[this] val split: 
HoodieRealtimeFileSplit,
+                                        private[this] val jobConf: JobConf) 
extends Iterator[UnsafeRow] with Closeable with Logging {
+  private[this] var havePair = false
+  private[this] var finished = false
+  private[this] var parquetFinished = false
+
+  private[this] var deltaRecordMap: 
util.Map[String,org.apache.hudi.common.model
+  .HoodieRecord[_ <: org.apache.hudi.common.model.HoodieRecordPayload[_ <: 
org.apache.hudi.common.model.HoodieRecordPayload[_ <: AnyRef]]]] = _
+  private[this] var deltaRecordKeys: util.Set[String] = _
+  private[this] var deltaIter: util.Iterator[String] = _
+  private[this] var avroSchema: Schema = _
+  private[this] var sparkTypes: StructType = _
+  private[this] var converter: AvroDeserializer = _
+
+  // SPARK-23457 Register a task completion lister before `initialization`.

Review comment:
       does this comment make sense in this context?

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetRecordReaderIterator.scala
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet

Review comment:
       why does this have to be under this package? so we can subclass 
`ParquetFileFormat`? 

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetRecordReaderIterator.scala
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+import org.apache.parquet.hadoop.ParquetRecordReader
+
+import org.apache.avro.Schema
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.types.StructType
+import java.io.Closeable
+import java.util
+
+import scala.util.Try
+
+/**
+ * This class is the iterator for Hudi MOR table.
+ * Log files are scanned on initialization.
+ * This iterator will read the parquet file first and skip the record if it 
present in the log file.
+ * Then read the log file.
+ * Custom payload is not supported yet. This combining logic is matching with 
[OverwriteWithLatestAvroPayload]
+ * @param rowReader HoodieRealtimeParquetRecordReader
+ */
+class HoodieParquetRecordReaderIterator(private[this] var rowReader: 
ParquetRecordReader[UnsafeRow],

Review comment:
       rename to `HoodieMergedParquetRowIterator` ?

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetRecordReaderIterator.scala
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.avro.Schema
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+import org.apache.hudi.realtime.HoodieRealtimeParquetRecordReader
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.types.StructType
+import java.io.Closeable
+import java.util
+
+/**
+ * This class is the iterator for Hudi MOR table.
+ * This iterator will read the parquet file first and skip the record if it 
present in the log file.
+ * Then read the log file.
+ * Custom payload is not supported yet. This combining logic is matching with 
[OverwriteWithLatestAvroPayload]

Review comment:
       Also we have re-implemented the merge by hand again in this class.. I 
was trying to explore if we can reuse the existing HoodieRecordReader classes 
by templatizing for `Row` instead of `ArrayWritable` .. That's atleast the more 
longer term option to pursue.. 

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieRealtimeInputFormat.scala
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{FileSplit, JobConf}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, 
ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+import scala.collection.JavaConverters._
+
+/**
+ * This class is an extension of ParquetFileFormat from Spark SQL.
+ * The file split, record reader, record reader iterator are customized to 
read Hudi MOR table.
+ */
+class HoodieRealtimeInputFormat extends ParquetFileFormat {
+  //TODO: Better usage of this short name.
+  override def shortName(): String = "hudi.snapshot"
+  override def toString(): String = "hudi.snapshot"
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter], options: 
Map[String, String],
+                                              hadoopConf: Configuration): 
(PartitionedFile) =>
+    Iterator[InternalRow] = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+    hadoopConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, 
requiredSchema.json)
+    hadoopConf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json)
+    hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key,
+      sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+    ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    hadoopConf.setBoolean(

Review comment:
       can we avoid all these copied code from ParquetFileFormat by wrapping or 
overriding? This should be doable?
   
   I am concerned about the amount of borrowed code in this file..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to