vinothchandar commented on a change in pull request #1722:
URL: https://github.com/apache/hudi/pull/1722#discussion_r438078737



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/RealtimeRelation.scala
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat, 
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class RealtimeRelation(val sqlContext: SQLContext,

Review comment:
       IIUC COW/Snapshot also comes here.. So let's rename this to 
`SnapshotRelation`. 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/RealtimeRelation.scala
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat, 
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class RealtimeRelation(val sqlContext: SQLContext,
+                       val basePath: String,
+                       val optParams: Map[String, String],
+                       val userSchema: StructType) extends BaseRelation with 
TableScan {
+
+  private val log = LogManager.getLogger(classOf[RealtimeRelation])
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // Set config for listStatus() in HoodieParquetInputFormat
+  conf.setClass(
+    "mapreduce.input.pathFilter.class",
+    classOf[HoodieROTablePathFilter],
+    classOf[org.apache.hadoop.fs.PathFilter])
+  conf.setStrings("mapreduce.input.fileinputformat.inputdir", basePath)
+  conf.setStrings("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
+
+  private val HoodieInputFormat = new HoodieParquetInputFormat
+  HoodieInputFormat.setConf(conf)
+  private val fileStatus = HoodieInputFormat.listStatus(new JobConf(conf))
+  log.debug("All parquet files" + fileStatus.map(s => 
s.getPath.toString).mkString(","))

Review comment:
       fence with with a `if log.isDebugEnabled()` check?

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -48,7 +49,7 @@ object DataSourceReadOptions {
   val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"
   val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"
   val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental"
-  val DEFAULT_QUERY_TYPE_OPT_VAL: String = QUERY_TYPE_SNAPSHOT_OPT_VAL
+  val DEFAULT_QUERY_TYPE_OPT_VAL: String = QUERY_TYPE_READ_OPTIMIZED_OPT_VAL

Review comment:
       default has to be SNAPSHOT.. why are we changing this..

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/RealtimeRelation.scala
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat, 
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class RealtimeRelation(val sqlContext: SQLContext,
+                       val basePath: String,
+                       val optParams: Map[String, String],
+                       val userSchema: StructType) extends BaseRelation with 
TableScan {
+
+  private val log = LogManager.getLogger(classOf[RealtimeRelation])
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // Set config for listStatus() in HoodieParquetInputFormat
+  conf.setClass(
+    "mapreduce.input.pathFilter.class",
+    classOf[HoodieROTablePathFilter],
+    classOf[org.apache.hadoop.fs.PathFilter])
+  conf.setStrings("mapreduce.input.fileinputformat.inputdir", basePath)
+  conf.setStrings("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
+
+  private val HoodieInputFormat = new HoodieParquetInputFormat
+  HoodieInputFormat.setConf(conf)
+  private val fileStatus = HoodieInputFormat.listStatus(new JobConf(conf))
+  log.debug("All parquet files" + fileStatus.map(s => 
s.getPath.toString).mkString(","))
+  private val fileGroup = 
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
util.Arrays.stream(fileStatus)).asScala
+
+  // Split the file group to: parquet file without a matching log file, 
parquet file need to merge with log files
+  private val parquetWithoutLogPaths: List[String] = fileGroup.filter(p => 
p._2.size() == 0).keys.toList
+  private val fileWithLogMap: Map[String, String] = fileGroup.filter(p => 
p._2.size() > 0).map{ case(k, v) => (k, v.asScala.toList.mkString(","))}.toMap
+  log.debug("ParquetWithoutLogPaths" + parquetWithoutLogPaths.mkString(","))
+  log.debug("ParquetWithLogPaths" + fileWithLogMap.map(m => 
s"${m._1}:${m._2}").mkString(","))
+
+  // Add log file map to options
+  private val finalOps = optParams ++ fileWithLogMap
+
+  // Load Hudi metadata
+  val metaClient = new HoodieTableMetaClient(conf, basePath, true)
+  private val hoodieTable = HoodieTable.create(metaClient, 
HoodieWriteConfig.newBuilder().withPath(basePath).build(), conf)
+
+  private val commitTimeline = 
hoodieTable.getMetaClient.getCommitsAndCompactionTimeline
+  if (commitTimeline.empty()) {
+    throw new HoodieException("No Valid Hudi timeline exists")
+  }
+  private val completedCommitTimeline = 
hoodieTable.getMetaClient.getCommitsTimeline.filterCompletedInstants()
+  private val lastInstant = completedCommitTimeline.lastInstant().get()
+  conf.setStrings("hoodie.realtime.last.commit", lastInstant.getTimestamp)
+
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields);
+    AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  override def schema: StructType = latestSchema
+
+  override def buildScan(): RDD[Row] = {
+    // Read parquet file doesn't have matching log file to merge as normal 
parquet
+    val regularParquet = sqlContext
+        .read
+        .options(finalOps)
+        .schema(schema)
+        .format("parquet")
+        .load(parquetWithoutLogPaths:_*)
+        .toDF()
+    // Hudi parquet files needed to merge with log file
+    sqlContext

Review comment:
       can we short circuit in cases where `fileWithLogMap` is empty? we will 
avoid the planning phases for the second datasource.

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieRealtimeInputFormat.scala
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+import org.apache.hudi.realtime.HoodieRealtimeParquetRecordReader
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{FileSplit, JobConf}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+import scala.collection.JavaConverters._
+
+/**
+ * This class is an extension of ParquetFileFormat from Spark SQL.
+ * The file split, record reader, record reader iterator are customized to 
read Hudi MOR table.
+ */
+class HoodieRealtimeInputFormat extends ParquetFileFormat {
+  //TODO: Better usage of this short name.
+  override def shortName(): String = "hudi.realtime"
+  override def toString(): String = "hudi.realtime"
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                               dataSchema: StructType,
+                                               partitionSchema: StructType,
+                                               requiredSchema: StructType,
+                                               filters: Seq[Filter],
+                                               options: Map[String, String],
+                                               hadoopConf: Configuration): 
(PartitionedFile) => Iterator[InternalRow] = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)

Review comment:
       can we get away by delegating this to be super class? 

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/realtime/HoodieRealtimeParquetRecordReader.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.realtime;
+
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.CorruptDeltaByteArrays;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetRecordReader;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.parquet.hadoop.ParquetInputFormat.SPLIT_FILES;
+
+/**
+ * Custom implementation of org.apache.parquet.hadoop.ParquetRecordReader.
+ * This class is a wrapper class. The real reader is the internalReader.
+ *
+ * @see ParquetRecordReader
+ *
+ * @param <T> type of the materialized records
+ */
+public class HoodieRealtimeParquetRecordReader<T> extends RecordReader<Void, 
T> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRealtimeParquetRecordReader.class);
+  public final CompactedRealtimeParquetReader<T> internalReader;
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. 
Thrift, Avro.
+   */
+  public HoodieRealtimeParquetRecordReader(ReadSupport<T> readSupport, 
HoodieRealtimeFileSplit split, JobConf job)
+      throws IOException {
+    this(readSupport, FilterCompat.NOOP, split, job);
+  }
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. 
Thrift, Avro.
+   * @param filter for filtering individual records
+   */
+  public HoodieRealtimeParquetRecordReader(ReadSupport<T> readSupport, Filter 
filter, HoodieRealtimeFileSplit split, JobConf job)
+      throws IOException {
+    this.internalReader = new CompactedRealtimeParquetReader<T>(readSupport, 
filter, split, job);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() throws IOException {
+    internalReader.close();
+  }
+
+  /**
+   * always returns null.
+   */
+  @Override
+  public Void getCurrentKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public T getCurrentValue() throws IOException,
+      InterruptedException {
+    return internalReader.getCurrentValue();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return internalReader.getProgress();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+
+    if (ContextUtil.hasCounterMethod(context)) {
+      BenchmarkCounter.initCounterFromContext(context);
+    } else {
+      LOG.error(
+          String.format("Can not initialize counter because the class '%s' 
does not have a '.getCounterMethod'",
+              context.getClass().getCanonicalName()));
+    }
+
+    initializeInternalReader((HoodieRealtimeFileSplit) inputSplit, 
ContextUtil.getConfiguration(context));
+  }
+
+  public void initialize(InputSplit inputSplit, Configuration configuration, 
Reporter reporter)
+      throws IOException, InterruptedException {
+    BenchmarkCounter.initCounterFromReporter(reporter,configuration);
+    initializeInternalReader((HoodieRealtimeFileSplit) inputSplit, 
configuration);
+  }
+
+  private void initializeInternalReader(HoodieRealtimeFileSplit split, 
Configuration configuration) throws IOException {
+    Path path = split.getPath();
+    ParquetReadOptions.Builder optionsBuilder = 
HadoopReadOptions.builder(configuration);
+    optionsBuilder.withRange(split.getStart(), split.getStart() + 
split.getLength());
+
+    // open a reader with the metadata filter
+    ParquetFileReader reader = ParquetFileReader.open(
+        HadoopInputFile.fromPath(path, configuration), optionsBuilder.build());
+    if (!reader.getRowGroups().isEmpty()) {
+      checkDeltaByteArrayProblem(
+          reader.getFooter().getFileMetaData(), configuration,
+          reader.getRowGroups().get(0));
+    }
+
+    internalReader.initialize(reader, configuration);
+  }
+
+  private void checkDeltaByteArrayProblem(FileMetaData meta, Configuration 
conf, BlockMetaData block) {
+    if (conf.getBoolean(SPLIT_FILES, true)) {

Review comment:
       is this adapted and reused from somewhere else ?

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieRealtimeInputFormat.scala
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+import org.apache.hudi.realtime.HoodieRealtimeParquetRecordReader
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{FileSplit, JobConf}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+import scala.collection.JavaConverters._
+
+/**
+ * This class is an extension of ParquetFileFormat from Spark SQL.
+ * The file split, record reader, record reader iterator are customized to 
read Hudi MOR table.
+ */
+class HoodieRealtimeInputFormat extends ParquetFileFormat {
+  //TODO: Better usage of this short name.
+  override def shortName(): String = "hudi.realtime"
+  override def toString(): String = "hudi.realtime"
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                               dataSchema: StructType,

Review comment:
       nit: indentation

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/realtime/HoodieRealtimeParquetRecordReader.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.realtime;
+
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.CorruptDeltaByteArrays;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetRecordReader;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.parquet.hadoop.ParquetInputFormat.SPLIT_FILES;
+
+/**
+ * Custom implementation of org.apache.parquet.hadoop.ParquetRecordReader.
+ * This class is a wrapper class. The real reader is the internalReader.
+ *
+ * @see ParquetRecordReader
+ *
+ * @param <T> type of the materialized records
+ */
+public class HoodieRealtimeParquetRecordReader<T> extends RecordReader<Void, 
T> {

Review comment:
       so the main difference here, is the use of `mapred.RecordReader` right.. 
Did you check hadoop-mr could instead be updated with `mapreduce.RecordReader`

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetRecordReaderIterator.scala
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.avro.Schema
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+import org.apache.hudi.realtime.HoodieRealtimeParquetRecordReader
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.types.StructType
+import java.io.Closeable
+import java.util
+
+/**
+ * This class is the iterator for Hudi MOR table.
+ * This iterator will read the parquet file first and skip the record if it 
present in the log file.
+ * Then read the log file.
+ * Custom payload is not supported yet. This combining logic is matching with 
[OverwriteWithLatestAvroPayload]

Review comment:
       custom payload is kind of an important thing, to support out of box.. 
any complications in supporting that?

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieRealtimeInputFormat.scala
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+import org.apache.hudi.realtime.HoodieRealtimeParquetRecordReader
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{FileSplit, JobConf}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+import scala.collection.JavaConverters._
+
+/**
+ * This class is an extension of ParquetFileFormat from Spark SQL.
+ * The file split, record reader, record reader iterator are customized to 
read Hudi MOR table.
+ */
+class HoodieRealtimeInputFormat extends ParquetFileFormat {

Review comment:
       and I guess you have it here in a differnt package to be able to extend 
ParquetFileFormat? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/RealtimeRelation.scala
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat, 
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class RealtimeRelation(val sqlContext: SQLContext,
+                       val basePath: String,
+                       val optParams: Map[String, String],
+                       val userSchema: StructType) extends BaseRelation with 
TableScan {
+
+  private val log = LogManager.getLogger(classOf[RealtimeRelation])
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // Set config for listStatus() in HoodieParquetInputFormat
+  conf.setClass(
+    "mapreduce.input.pathFilter.class",
+    classOf[HoodieROTablePathFilter],
+    classOf[org.apache.hadoop.fs.PathFilter])
+  conf.setStrings("mapreduce.input.fileinputformat.inputdir", basePath)
+  conf.setStrings("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
+
+  private val HoodieInputFormat = new HoodieParquetInputFormat
+  HoodieInputFormat.setConf(conf)
+  private val fileStatus = HoodieInputFormat.listStatus(new JobConf(conf))
+  log.debug("All parquet files" + fileStatus.map(s => 
s.getPath.toString).mkString(","))
+  private val fileGroup = 
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
util.Arrays.stream(fileStatus)).asScala
+
+  // Split the file group to: parquet file without a matching log file, 
parquet file need to merge with log files
+  private val parquetWithoutLogPaths: List[String] = fileGroup.filter(p => 
p._2.size() == 0).keys.toList
+  private val fileWithLogMap: Map[String, String] = fileGroup.filter(p => 
p._2.size() > 0).map{ case(k, v) => (k, v.asScala.toList.mkString(","))}.toMap
+  log.debug("ParquetWithoutLogPaths" + parquetWithoutLogPaths.mkString(","))
+  log.debug("ParquetWithLogPaths" + fileWithLogMap.map(m => 
s"${m._1}:${m._2}").mkString(","))
+
+  // Add log file map to options
+  private val finalOps = optParams ++ fileWithLogMap
+
+  // Load Hudi metadata
+  val metaClient = new HoodieTableMetaClient(conf, basePath, true)
+  private val hoodieTable = HoodieTable.create(metaClient, 
HoodieWriteConfig.newBuilder().withPath(basePath).build(), conf)
+
+  private val commitTimeline = 
hoodieTable.getMetaClient.getCommitsAndCompactionTimeline
+  if (commitTimeline.empty()) {
+    throw new HoodieException("No Valid Hudi timeline exists")
+  }
+  private val completedCommitTimeline = 
hoodieTable.getMetaClient.getCommitsTimeline.filterCompletedInstants()
+  private val lastInstant = completedCommitTimeline.lastInstant().get()
+  conf.setStrings("hoodie.realtime.last.commit", lastInstant.getTimestamp)
+
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields);
+    AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  override def schema: StructType = latestSchema
+
+  override def buildScan(): RDD[Row] = {
+    // Read parquet file doesn't have matching log file to merge as normal 
parquet
+    val regularParquet = sqlContext
+        .read
+        .options(finalOps)
+        .schema(schema)
+        .format("parquet")
+        .load(parquetWithoutLogPaths:_*)
+        .toDF()
+    // Hudi parquet files needed to merge with log file
+    sqlContext
+      .read
+      .options(finalOps)
+      .schema(schema)
+      
.format("org.apache.spark.sql.execution.datasources.parquet.HoodieRealtimeInputFormat")
+      .load(fileWithLogMap.keys.toList:_*)
+      .toDF()
+      .union(regularParquet)
+      .rdd

Review comment:
       this can be hurting performance? the .rdd conversion.. 

##########
File path: 
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieRealtimeInputFormat.scala
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+import org.apache.hudi.realtime.HoodieRealtimeParquetRecordReader
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{FileSplit, JobConf}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+import scala.collection.JavaConverters._
+
+/**
+ * This class is an extension of ParquetFileFormat from Spark SQL.
+ * The file split, record reader, record reader iterator are customized to 
read Hudi MOR table.
+ */
+class HoodieRealtimeInputFormat extends ParquetFileFormat {
+  //TODO: Better usage of this short name.
+  override def shortName(): String = "hudi.realtime"
+  override def toString(): String = "hudi.realtime"
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                               dataSchema: StructType,
+                                               partitionSchema: StructType,
+                                               requiredSchema: StructType,
+                                               filters: Seq[Filter],
+                                               options: Map[String, String],
+                                               hadoopConf: Configuration): 
(PartitionedFile) => Iterator[InternalRow] = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+    hadoopConf.set(
+      ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      ParquetWriteSupport.SPARK_ROW_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      SQLConf.SESSION_LOCAL_TIMEZONE.key,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(
+      SQLConf.CASE_SENSITIVE.key,
+      sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+    ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sparkSession.sessionState.conf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
+
+    // TODO: if you move this into the closure it reverts to the default 
values.
+    // If true, enable using the custom RecordReader for parquet. This only 
works for
+    // a subset of the types (no complex types).
+    val resultSchema = StructType(partitionSchema.fields ++ 
requiredSchema.fields)
+    val sqlConf = sparkSession.sessionState.conf
+    val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+    val timestampConversion: Boolean = 
sqlConf.isParquetINT96TimestampConversion
+    val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+    // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+    val returningBatch = supportBatch(sparkSession, resultSchema)
+    val pushDownDate = sqlConf.parquetFilterPushDownDate
+    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+    val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
+    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+
+    (file: PartitionedFile) => {
+      assert(file.partitionValues.numFields == partitionSchema.size)
+
+      val sharedConf = broadcastedHadoopConf.value.value
+      val fileSplit =
+        new FileSplit(new Path(new URI(file.filePath)), file.start, 
file.length, new Array[String](0))
+      val filePath = fileSplit.getPath
+
+      val basePath = sharedConf.get("mapreduce.input.fileinputformat.inputdir")
+      val maxCommitTime = sharedConf.get("hoodie.realtime.last.commit")
+      // Read the log file path from the option
+      val logPathStr = options.getOrElse(fileSplit.getPath.toString, 
"").split(",")
+      log.debug(s"fileSplit.getPath in HoodieRealtimeInputFormat: 
${fileSplit.getPath} and ${fileSplit.getPath.getName}")
+      log.debug(s"logPath in HoodieRealtimeInputFormat: 
${logPathStr.toString}")
+      val hoodieRealtimeFileSplit = new HoodieRealtimeFileSplit(fileSplit, 
basePath, logPathStr.toList.asJava, maxCommitTime)
+
+      lazy val footerFileMetaData =
+        ParquetFileReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
+      // Try to push down filters when filter push-down is enabled.
+      val pushed = if (enableParquetFilterPushDown) {
+        val parquetSchema = footerFileMetaData.getSchema
+        val parquetFilters = new ParquetFilters(pushDownDate, 
pushDownTimestamp, pushDownDecimal,
+          pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
+        filters
+          // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
+          // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
+          // is used here.
+          .flatMap(parquetFilters.createFilter(parquetSchema, _))
+          .reduceOption(FilterApi.and)
+      } else {
+        None
+      }
+
+      // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions 
to int96 timestamps'
+      // *only* if the file was created by something other than "parquet-mr", 
so check the actual
+      // writer here for this file.  We have to do this per-file, as each file 
in the table may
+      // have different writers.
+      // Define isCreatedByParquetMr as function to avoid unnecessary parquet 
footer reads.
+      def isCreatedByParquetMr: Boolean =
+        footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+      val convertTz =
+        if (timestampConversion && !isCreatedByParquetMr) {
+          
Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+        } else {
+          None
+        }
+
+      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
+      val hadoopAttemptContext =
+        new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, 
attemptId)
+
+      // Try to push down filters when filter push-down is enabled.
+      // Notice: This push-down is RowGroups level, not individual records.
+      if (pushed.isDefined) {
+        
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
+      }
+      val taskContext = Option(TaskContext.get())
+      //TODO: Support the vectorized reader.
+      logDebug(s"Falling back to parquet-mr")
+      // ParquetRecordReader returns UnsafeRow
+      val reader = if (pushed.isDefined && enableRecordFilter) {
+        val parquetFilter = FilterCompat.get(pushed.get, null)
+        new HoodieRealtimeParquetRecordReader[UnsafeRow](new 
ParquetReadSupport(convertTz), parquetFilter, hoodieRealtimeFileSplit, new 
JobConf(sharedConf))
+      } else {
+        new HoodieRealtimeParquetRecordReader[UnsafeRow](new 
ParquetReadSupport(convertTz), hoodieRealtimeFileSplit, new JobConf(sharedConf))
+      }
+      val iter = new HoodieParquetRecordReaderIterator(reader)
+      // SPARK-23457 Register a task completion lister before `initialization`.
+      taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+      reader.initialize(hoodieRealtimeFileSplit, hadoopAttemptContext)
+      iter.init()
+
+      val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
+      val joinedRow = new JoinedRow()
+      val appendPartitionColumns = 
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+
+      // This is a horrible erasure hack...  if we type the iterator above, 
then it actually check

Review comment:
       is this codde re-used from some place? 

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/realtime/AbstractRealtimeParquetReader.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.realtime;
+
+import org.apache.hudi.common.table.log.LogReaderUtils;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.UnmaterializableRecordCounter;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+import org.mortbay.log.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
+
+/**
+ * This class is customized from 
org.apache.parquet.hadoop.InternalParquetRecordReader combining with 
AbstractRealtimeRecordReader.

Review comment:
       This will become a maintenance issue.. is there a way to wrap this..
   
   (Will review this more deeply)

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/RealtimeRelation.scala
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat, 
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class RealtimeRelation(val sqlContext: SQLContext,
+                       val basePath: String,
+                       val optParams: Map[String, String],
+                       val userSchema: StructType) extends BaseRelation with 
TableScan {
+
+  private val log = LogManager.getLogger(classOf[RealtimeRelation])
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // Set config for listStatus() in HoodieParquetInputFormat
+  conf.setClass(
+    "mapreduce.input.pathFilter.class",
+    classOf[HoodieROTablePathFilter],
+    classOf[org.apache.hadoop.fs.PathFilter])
+  conf.setStrings("mapreduce.input.fileinputformat.inputdir", basePath)
+  conf.setStrings("mapreduce.input.fileinputformat.input.dir.recursive", 
"true")
+
+  private val HoodieInputFormat = new HoodieParquetInputFormat
+  HoodieInputFormat.setConf(conf)
+  private val fileStatus = HoodieInputFormat.listStatus(new JobConf(conf))
+  log.debug("All parquet files" + fileStatus.map(s => 
s.getPath.toString).mkString(","))
+  private val fileGroup = 
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
util.Arrays.stream(fileStatus)).asScala
+
+  // Split the file group to: parquet file without a matching log file, 
parquet file need to merge with log files
+  private val parquetWithoutLogPaths: List[String] = fileGroup.filter(p => 
p._2.size() == 0).keys.toList
+  private val fileWithLogMap: Map[String, String] = fileGroup.filter(p => 
p._2.size() > 0).map{ case(k, v) => (k, v.asScala.toList.mkString(","))}.toMap
+  log.debug("ParquetWithoutLogPaths" + parquetWithoutLogPaths.mkString(","))
+  log.debug("ParquetWithLogPaths" + fileWithLogMap.map(m => 
s"${m._1}:${m._2}").mkString(","))
+
+  // Add log file map to options
+  private val finalOps = optParams ++ fileWithLogMap
+
+  // Load Hudi metadata
+  val metaClient = new HoodieTableMetaClient(conf, basePath, true)
+  private val hoodieTable = HoodieTable.create(metaClient, 
HoodieWriteConfig.newBuilder().withPath(basePath).build(), conf)
+
+  private val commitTimeline = 
hoodieTable.getMetaClient.getCommitsAndCompactionTimeline
+  if (commitTimeline.empty()) {
+    throw new HoodieException("No Valid Hudi timeline exists")
+  }
+  private val completedCommitTimeline = 
hoodieTable.getMetaClient.getCommitsTimeline.filterCompletedInstants()
+  private val lastInstant = completedCommitTimeline.lastInstant().get()
+  conf.setStrings("hoodie.realtime.last.commit", lastInstant.getTimestamp)
+
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields);
+    AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  override def schema: StructType = latestSchema
+
+  override def buildScan(): RDD[Row] = {
+    // Read parquet file doesn't have matching log file to merge as normal 
parquet

Review comment:
       one thing to ensure is COW/Snapshot will use spark's native parquet 
reader.. I think we push the same path filter above and reuse the spark source 
here. so should work.. But good to test once with few queries and ensure there 
is no change in perf




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to