This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 79ec7b4 [HUDI-920] Support Incremental query for MOR table (#1938)
79ec7b4 is described below
commit 79ec7b4894b997183a6e10fdc19d34f5ab4ea437
Author: Gary Li <[email protected]>
AuthorDate: Sun Jan 10 00:02:08 2021 +0800
[HUDI-920] Support Incremental query for MOR table (#1938)
---
.../hudi/hadoop/utils/HoodieInputFormatUtils.java | 41 ++++
.../main/scala/org/apache/hudi/DefaultSource.scala | 7 +-
.../org/apache/hudi/HoodieMergeOnReadRDD.scala | 60 +++++-
.../org/apache/hudi/IncrementalRelation.scala | 11 +-
.../hudi/MergeOnReadIncrementalRelation.scala | 218 +++++++++++++++++++++
.../apache/hudi/MergeOnReadSnapshotRelation.scala | 6 +-
.../apache/hudi/functional/TestMORDataSource.scala | 165 +++++++++++++---
7 files changed, 463 insertions(+), 45 deletions(-)
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index cf7da54..019b558 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -470,4 +471,44 @@ public class HoodieInputFormatUtils {
}
}
+ /**
+ * Iterate through a list of commits in ascending order, and extract the
file status of
+ * all affected files from the commits metadata grouping by partition path.
If the files has
+ * been touched multiple times in the given commits, the return value will
keep the one
+ * from the latest commit.
+ * @param basePath
+ * @param commitsToCheck
+ * @param timeline
+ * @return HashMap<partitionPath, HashMap<fileName, FileStatus>>
+ * @throws IOException
+ */
+ public static HashMap<String, HashMap<String, FileStatus>>
listAffectedFilesForCommits(
+ Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline
timeline) throws IOException {
+ // TODO: Use HoodieMetaTable to extract affected file directly.
+ HashMap<String, HashMap<String, FileStatus>> partitionToFileStatusesMap =
new HashMap<>();
+ List<HoodieInstant> sortedCommitsToCheck = new ArrayList<>(commitsToCheck);
+ sortedCommitsToCheck.sort(HoodieInstant::compareTo);
+ // Iterate through the given commits.
+ for (HoodieInstant commit: sortedCommitsToCheck) {
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+ HoodieCommitMetadata.class);
+ // Iterate through all the affected partitions of a commit.
+ for (Map.Entry<String, List<HoodieWriteStat>> entry:
commitMetadata.getPartitionToWriteStats().entrySet()) {
+ if (!partitionToFileStatusesMap.containsKey(entry.getKey())) {
+ partitionToFileStatusesMap.put(entry.getKey(), new HashMap<>());
+ }
+ // Iterate through all the written files of this partition.
+ for (HoodieWriteStat stat : entry.getValue()) {
+ String relativeFilePath = stat.getPath();
+ Path fullPath = relativeFilePath != null ?
FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
+ if (fullPath != null) {
+ FileStatus fs = new FileStatus(stat.getFileSizeInBytes(), false,
0, 0,
+ 0, fullPath);
+
partitionToFileStatusesMap.get(entry.getKey()).put(fullPath.getName(), fs);
+ }
+ }
+ }
+ }
+ return partitionToFileStatusesMap;
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index 0e322e2..d26390d 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -94,7 +94,12 @@ class DefaultSource extends RelationProvider
} else
if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths,
isBootstrappedTable, globPaths, metaClient)
} else if
(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
- new IncrementalRelation(sqlContext, tablePath, optParams, schema)
+ val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
+ if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
+ new MergeOnReadIncrementalRelation(sqlContext, optParams, schema,
metaClient)
+ } else {
+ new IncrementalRelation(sqlContext, optParams, schema, metaClient)
+ }
} else {
throw new HoodieException("Invalid query type :" +
parameters(QUERY_TYPE_OPT_KEY))
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index e8caa63..e20c33c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -50,30 +50,32 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private val confBroadcast = sc.broadcast(new SerializableWritable(config))
override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
- val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
- mergeParquetPartition.split match {
+ val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
+ mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
- read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader)
+ read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
+ case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
+ logFileIterator(logFileOnlySplit, getConfig)
case skipMergeSplit if skipMergeSplit.mergeType
.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
skipMergeFileIterator(
skipMergeSplit,
- read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader),
+ read(skipMergeSplit.dataFile.get, requiredSchemaFileReader),
getConfig
)
case payloadCombineSplit if payloadCombineSplit.mergeType
.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
payloadCombineFileIterator(
payloadCombineSplit,
- read(mergeParquetPartition.split.dataFile, fullSchemaFileReader),
+ read(payloadCombineSplit.dataFile.get, fullSchemaFileReader),
getConfig
)
case _ => throw new HoodieException(s"Unable to select an Iterator to
read the Hoodie MOR File Split for " +
- s"file path: ${mergeParquetPartition.split.dataFile.filePath}" +
- s"log paths: ${mergeParquetPartition.split.logPaths.toString}" +
- s"hoodie table path: ${mergeParquetPartition.split.tablePath}" +
- s"spark partition Index: ${mergeParquetPartition.index}" +
- s"merge type: ${mergeParquetPartition.split.mergeType}")
+ s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
+ s"log paths: ${mergeOnReadPartition.split.logPaths.toString}" +
+ s"hoodie table path: ${mergeOnReadPartition.split.tablePath}" +
+ s"spark partition Index: ${mergeOnReadPartition.index}" +
+ s"merge type: ${mergeOnReadPartition.split.mergeType}")
}
}
@@ -101,6 +103,44 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
rows
}
+ private def logFileIterator(split: HoodieMergeOnReadFileSplit,
+ config: Configuration): Iterator[InternalRow] =
+ new Iterator[InternalRow] {
+ private val tableAvroSchema = new
Schema.Parser().parse(tableState.tableAvroSchema)
+ private val requiredAvroSchema = new
Schema.Parser().parse(tableState.requiredAvroSchema)
+ private val requiredFieldPosition =
+ tableState.requiredStructSchema
+ .map(f => tableAvroSchema.getField(f.name).pos()).toList
+ private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
+ private val deserializer = new AvroDeserializer(requiredAvroSchema,
tableState.requiredStructSchema)
+ private val unsafeProjection =
UnsafeProjection.create(tableState.requiredStructSchema)
+ private val logRecords = HoodieMergeOnReadRDD.scanLog(split,
tableAvroSchema, config).getRecords
+ private val logRecordsKeyIterator =
logRecords.keySet().iterator().asScala
+
+ private var recordToLoad: InternalRow = _
+ override def hasNext: Boolean = {
+ if (logRecordsKeyIterator.hasNext) {
+ val curAvrokey = logRecordsKeyIterator.next()
+ val curAvroRecord =
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema)
+ if (!curAvroRecord.isPresent) {
+ // delete record found, skipping
+ this.hasNext
+ } else {
+ val requiredAvroRecord = AvroConversionUtils
+ .buildAvroRecordBySchema(curAvroRecord.get(),
requiredAvroSchema, requiredFieldPosition, recordBuilder)
+ recordToLoad =
unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
+ true
+ }
+ } else {
+ false
+ }
+ }
+
+ override def next(): InternalRow = {
+ recordToLoad
+ }
+ }
+
private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow],
config: Configuration):
Iterator[InternalRow] =
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 9cd562c..5c20656 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -42,19 +42,14 @@ import scala.collection.mutable
*
*/
class IncrementalRelation(val sqlContext: SQLContext,
- val basePath: String,
val optParams: Map[String, String],
- val userSchema: StructType) extends BaseRelation
with TableScan {
+ val userSchema: StructType,
+ val metaClient: HoodieTableMetaClient) extends
BaseRelation with TableScan {
private val log = LogManager.getLogger(classOf[IncrementalRelation])
val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
- private val metaClient = new
HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath,
true)
-
- // MOR tables not supported yet
- if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
- throw new HoodieException("Incremental view not implemented yet, for
merge-on-read tables")
- }
+ private val basePath = metaClient.getBasePath
// TODO : Figure out a valid HoodieWriteConfig
private val hoodieTable =
HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath).build(),
new HoodieSparkEngineContext(new
JavaSparkContext(sqlContext.sparkContext)),
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
new file mode 100644
index 0000000..d7b8cff
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobPattern, Path}
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.exception.HoodieException
+import
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits
+import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+import org.apache.log4j.LogManager
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SQLContext}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Experimental.
+ * Relation, that implements the Hoodie incremental view for Merge On Read
table.
+ *
+ */
+class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
+ val optParams: Map[String, String],
+ val userSchema: StructType,
+ val metaClient: HoodieTableMetaClient)
+ extends BaseRelation with PrunedFilteredScan {
+
+ private val log =
LogManager.getLogger(classOf[MergeOnReadIncrementalRelation])
+ private val conf = sqlContext.sparkContext.hadoopConfiguration
+ private val jobConf = new JobConf(conf)
+ private val fs = FSUtils.getFs(metaClient.getBasePath, conf)
+ private val commitTimeline =
metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
+ if (commitTimeline.empty()) {
+ throw new HoodieException("No instants to incrementally pull")
+ }
+ if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY)) {
+ throw new HoodieException(s"Specify the begin instant time to pull from
using " +
+ s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
+ }
+
+ private val lastInstant = commitTimeline.lastInstant().get()
+ private val mergeType = optParams.getOrElse(
+ DataSourceReadOptions.REALTIME_MERGE_OPT_KEY,
+ DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL)
+
+ private val commitsTimelineToReturn = commitTimeline.findInstantsInRange(
+ optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY),
+ optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY,
lastInstant.getTimestamp))
+ log.debug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f =>
f.toString).mkString(",")}")
+ private val commitsToReturn =
commitsTimelineToReturn.getInstants.iterator().toList
+ private val schemaUtil = new TableSchemaResolver(metaClient)
+ private val tableAvroSchema = schemaUtil.getTableAvroSchema
+ private val tableStructSchema =
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+ private val maxCompactionMemoryInBytes =
getMaxCompactionMemoryInBytes(jobConf)
+ private val fileIndex = buildFileIndex()
+
+ override def schema: StructType = tableStructSchema
+
+ override def needConversion: Boolean = false
+
+ override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
+ val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+ val largerThanFilter =
GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp)
+ val lessThanFilter =
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)
+ filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter
+ }
+
+ override def buildScan(requiredColumns: Array[String], filters:
Array[Filter]): RDD[Row] = {
+ log.debug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}")
+ log.debug(s"buildScan filters = ${filters.mkString(",")}")
+ // config to ensure the push down filter for parquet will be applied.
+
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown",
"true")
+
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled",
"true")
+
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
"false")
+ val pushDownFilter = {
+ val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+ val largerThanFilter =
GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp)
+ val lessThanFilter =
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)
+ filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter
+ }
+ var requiredStructSchema = StructType(Seq())
+ requiredColumns.foreach(col => {
+ val field = tableStructSchema.find(_.name == col)
+ if (field.isDefined) {
+ requiredStructSchema = requiredStructSchema.add(field.get)
+ }
+ })
+ val requiredAvroSchema = AvroConversionUtils
+ .convertStructTypeToAvroSchema(requiredStructSchema,
tableAvroSchema.getName, tableAvroSchema.getNamespace)
+ val hoodieTableState = HoodieMergeOnReadTableState(
+ tableStructSchema,
+ requiredStructSchema,
+ tableAvroSchema.toString,
+ requiredAvroSchema.toString,
+ fileIndex
+ )
+ val fullSchemaParquetReader = new
ParquetFileFormat().buildReaderWithPartitionValues(
+ sparkSession = sqlContext.sparkSession,
+ dataSchema = tableStructSchema,
+ partitionSchema = StructType(Nil),
+ requiredSchema = tableStructSchema,
+ filters = pushDownFilter,
+ options = optParams,
+ hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ )
+ val requiredSchemaParquetReader = new
ParquetFileFormat().buildReaderWithPartitionValues(
+ sparkSession = sqlContext.sparkSession,
+ dataSchema = tableStructSchema,
+ partitionSchema = StructType(Nil),
+ requiredSchema = requiredStructSchema,
+ filters = pushDownFilter,
+ options = optParams,
+ hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ )
+
+ // Follow the implementation of Spark internal HadoopRDD to handle the
broadcast configuration.
+ FileSystem.getLocal(jobConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val rdd = new HoodieMergeOnReadRDD(
+ sqlContext.sparkContext,
+ jobConf,
+ fullSchemaParquetReader,
+ requiredSchemaParquetReader,
+ hoodieTableState
+ )
+ rdd.asInstanceOf[RDD[Row]]
+ }
+
+ def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
+ val partitionsWithFileStatus = listAffectedFilesForCommits(new
Path(metaClient.getBasePath),
+ commitsToReturn, commitsTimelineToReturn)
+ val affectedFileStatus = new ListBuffer[FileStatus]
+ partitionsWithFileStatus.iterator.foreach(p =>
+ p._2.iterator.foreach(status => affectedFileStatus += status._2))
+ val fsView = new HoodieTableFileSystemView(metaClient,
+ commitsTimelineToReturn, affectedFileStatus.toArray)
+
+ // Iterate partitions to create splits
+ val fileGroup = partitionsWithFileStatus.keySet().flatMap(partitionPath =>
+ fsView.getAllFileGroups(partitionPath).iterator()
+ ).toList
+ val latestCommit = fsView.getLastInstant.get().getTimestamp
+ if (log.isDebugEnabled) {
+ fileGroup.foreach(f => log.debug(s"current file group id: " +
+ s"${f.getFileGroupId} and file slices
${f.getLatestFileSlice.get().toString}"))
+ }
+
+ // Filter files based on user defined glob pattern
+ val pathGlobPattern = optParams.getOrElse(
+ DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
+ DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)
+ val filteredFileGroup = if(!pathGlobPattern
+ .equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) {
+ val globMatcher = new GlobPattern("*" + pathGlobPattern)
+ fileGroup.filter(f => {
+ if (f.getLatestFileSlice.get().getBaseFile.isPresent) {
+
globMatcher.matches(f.getLatestFileSlice.get().getBaseFile.get.getPath)
+ } else {
+
globMatcher.matches(f.getLatestFileSlice.get().getLatestLogFile.get().getPath.toString)
+ }
+ })
+ } else {
+ fileGroup
+ }
+
+ // Build HoodieMergeOnReadFileSplit.
+ filteredFileGroup.map(f => {
+ // Ensure get the base file when there is a pending compaction, which
means the base file
+ // won't be in the latest file slice.
+ val baseFiles = f.getAllFileSlices.iterator().filter(slice =>
slice.getBaseFile.isPresent).toList
+ val partitionedFile = if (baseFiles.nonEmpty) {
+ val baseFile = baseFiles.head.getBaseFile
+ Option(PartitionedFile(InternalRow.empty, baseFile.get.getPath, 0,
baseFile.get.getFileLen))
+ }
+ else {
+ Option.empty
+ }
+
+ val logPath = if (f.getLatestFileSlice.isPresent) {
+ //If log path doesn't exist, we still include an empty path to avoid
using
+ // the default parquet reader to ensure the push down filter will be
applied.
+ Option(f.getLatestFileSlice.get().getLogFiles.iterator().toList
+ .map(logfile => logfile.getPath.toString))
+ }
+ else {
+ Option.empty
+ }
+
+ HoodieMergeOnReadFileSplit(partitionedFile, logPath,
+ latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes,
mergeType)
+ })
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index 0b81fa7..328e3c3 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
-case class HoodieMergeOnReadFileSplit(dataFile: PartitionedFile,
+case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile],
logPaths: Option[List[String]],
latestCommit: String,
tablePath: String,
@@ -99,7 +99,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = tableStructSchema,
- filters = Seq(),
+ filters = filters,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
@@ -140,7 +140,7 @@ class MergeOnReadSnapshotRelation(val sqlContext:
SQLContext,
val baseFile = kv._1
val logPaths = if (kv._2.isEmpty) Option.empty else
Option(kv._2.asScala.toList)
val partitionedFile = PartitionedFile(InternalRow.empty,
baseFile.getPath, 0, baseFile.getFileLen)
- HoodieMergeOnReadFileSplit(partitionedFile, logPaths, latestCommit,
+ HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths,
latestCommit,
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
}).toList
fileSplits
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 7c73669..121957e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -19,7 +19,7 @@ package org.apache.hudi.functional
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.testutils.HoodieClientTestBase
@@ -29,6 +29,7 @@ import org.apache.spark.sql.functions._
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
import scala.collection.JavaConversions._
/**
@@ -157,6 +158,39 @@ class TestMORDataSource extends HoodieClientTestBase {
assertTrue(commit2Time > commit1Time)
assertEquals(100, hudiSnapshotDF2.join(hudiSnapshotDF1,
Seq("_hoodie_record_key"), "left").count())
+ // incremental view
+ // base file only
+ val hudiIncDF1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+ .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commit1Time)
+ .load(basePath)
+ assertEquals(100, hudiIncDF1.count())
+ assertEquals(1,
hudiIncDF1.select("_hoodie_commit_time").distinct().count())
+ assertEquals(commit1Time,
hudiIncDF1.select("_hoodie_commit_time").head().get(0).toString)
+ hudiIncDF1.show(1)
+ // log file only
+ val hudiIncDF2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit1Time)
+ .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commit2Time)
+ .load(basePath)
+ assertEquals(100, hudiIncDF2.count())
+ assertEquals(1,
hudiIncDF2.select("_hoodie_commit_time").distinct().count())
+ assertEquals(commit2Time,
hudiIncDF2.select("_hoodie_commit_time").head().get(0).toString)
+ hudiIncDF2.show(1)
+
+ // base file + log file
+ val hudiIncDF3 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+ .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commit2Time)
+ .load(basePath)
+ assertEquals(100, hudiIncDF3.count())
+ // log file being load
+ assertEquals(1,
hudiIncDF3.select("_hoodie_commit_time").distinct().count())
+ assertEquals(commit2Time,
hudiIncDF3.select("_hoodie_commit_time").head().get(0).toString)
+
// Unmerge
val hudiSnapshotSkipMergeDF2 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -193,6 +227,22 @@ class TestMORDataSource extends HoodieClientTestBase {
assertEquals(50,
hudiSnapshotDF3.join(hudiSnapshotDF2, Seq("_hoodie_record_key",
"_hoodie_commit_time"), "inner").count())
+ // incremental query from commit2Time
+ val hudiIncDF4 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit2Time)
+ .load(basePath)
+ assertEquals(50, hudiIncDF4.count())
+
+ // skip merge incremental view
+ // including commit 2 and commit 3
+ val hudiIncDF4SkipMerge = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+ .option(DataSourceReadOptions.REALTIME_MERGE_OPT_KEY,
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
+ .load(basePath)
+ assertEquals(200, hudiIncDF4SkipMerge.count())
+
// Fourth Operation:
// Insert records to a new partition. Produced a new parquet file.
// SNAPSHOT view should read the latest log files from the default
partition and parquet from the new partition.
@@ -213,21 +263,51 @@ class TestMORDataSource extends HoodieClientTestBase {
assertEquals(100,
hudiSnapshotDF1.join(hudiSnapshotDF4, Seq("_hoodie_record_key"),
"inner").count())
+ // Incremental query, 50 from log file, 100 from base file of the new
partition.
+ val hudiIncDF5 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit2Time)
+ .load(basePath)
+ assertEquals(150, hudiIncDF5.count())
+
// Fifth Operation:
// Upsert records to the new partition. Produced a newer version of
parquet file.
// SNAPSHOT view should read the latest log files from the default
partition
// and the latest parquet from the new partition.
- val records5 = recordsToStrings(newDataGen.generateUpdates("005",
100)).toList
+ val records5 = recordsToStrings(newDataGen.generateUniqueUpdates("005",
50)).toList
val inputDF5: Dataset[Row] =
spark.read.json(spark.sparkContext.parallelize(records5, 2))
inputDF5.write.format("org.apache.hudi")
.options(commonOpts)
- .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option("hoodie.compact.inline", "true")
.mode(SaveMode.Append)
.save(basePath)
+ val commit5Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)
val hudiSnapshotDF5 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/*/*/*/*")
assertEquals(200, hudiSnapshotDF5.count())
+
+ // Sixth Operation:
+ // Insert 2 records and trigger compaction.
+ val records6 = recordsToStrings(newDataGen.generateInserts("006",
2)).toList
+ val inputDF6: Dataset[Row] =
spark.read.json(spark.sparkContext.parallelize(records6, 2))
+ inputDF6.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option("hoodie.compact.inline", "true")
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val commit6Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ val hudiSnapshotDF6 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/2020/01/10/*")
+ assertEquals(102, hudiSnapshotDF6.count())
+ val hudiIncDF6 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit5Time)
+ .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commit6Time)
+ .load(basePath)
+ // compaction updated 150 rows + inserted 2 new row
+ assertEquals(152, hudiIncDF6.count())
}
@Test
@@ -276,6 +356,13 @@ class TestMORDataSource extends HoodieClientTestBase {
.load(basePath + "/*/*/*/*")
assertEquals(100, hudiSnapshotDF2Unmerge.count())
+ // incremental query, read 50 delete records from log file and get 0 count.
+ val hudiIncDF1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit2Time)
+ .load(basePath)
+ assertEquals(0, hudiIncDF1.count())
+
// Third Operation:
// Upsert 50 delete records to delete the reset
// Snopshot view should read 0 record
@@ -308,6 +395,8 @@ class TestMORDataSource extends HoodieClientTestBase {
val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/*/*/*/*")
+ val commit1Time =
hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString
+
assertEquals(100, hudiSnapshotDF1.count())
// select nested columns with order different from the actual schema
assertEquals("amount,currency,tip_history,_hoodie_commit_seqno",
@@ -329,34 +418,43 @@ class TestMORDataSource extends HoodieClientTestBase {
val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/*/*/*/*")
-
- val commit1Time =
hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString
+ val hudiIncDF1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+ .load(basePath)
+ val hudiIncDF1Skipmerge = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.REALTIME_MERGE_OPT_KEY,
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+ .load(basePath)
+ val hudiIncDF2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit1Time)
+ .load(basePath)
// filter first commit and only read log records
assertEquals(50, hudiSnapshotDF2.select("_hoodie_commit_seqno",
"fare.amount", "fare.currency", "tip_history")
.filter(col("_hoodie_commit_time") > commit1Time).count())
+ assertEquals(50, hudiIncDF1.select("_hoodie_commit_seqno", "fare.amount",
"fare.currency", "tip_history")
+ .filter(col("_hoodie_commit_time") > commit1Time).count())
+ assertEquals(50, hudiIncDF2
+ .select("_hoodie_commit_seqno", "fare.amount", "fare.currency",
"tip_history").count())
+ assertEquals(150, hudiIncDF1Skipmerge
+ .select("_hoodie_commit_seqno", "fare.amount", "fare.currency",
"tip_history").count())
// select nested columns with order different from the actual schema
- assertEquals("amount,currency,tip_history,_hoodie_commit_seqno",
- hudiSnapshotDF2
- .select("fare.amount", "fare.currency", "tip_history",
"_hoodie_commit_seqno")
- .orderBy(desc("_hoodie_commit_seqno"))
- .columns.mkString(","))
-
- // Correctly loading type
- val sampleRow = hudiSnapshotDF2
- .select("begin_lat", "current_date", "fare.currency", "tip_history",
"nation")
- .orderBy(desc("_hoodie_commit_time"))
- .head()
- assertEquals(sampleRow.getDouble(0), sampleRow.get(0))
- assertEquals(sampleRow.getLong(1), sampleRow.get(1))
- assertEquals(sampleRow.getString(2), sampleRow.get(2))
- assertEquals(sampleRow.getSeq(3), sampleRow.get(3))
- assertEquals(sampleRow.getStruct(4), sampleRow.get(4))
+ verifySchemaAndTypes(hudiSnapshotDF1)
+ verifySchemaAndTypes(hudiSnapshotDF2)
+ verifySchemaAndTypes(hudiIncDF1)
+ verifySchemaAndTypes(hudiIncDF2)
+ verifySchemaAndTypes(hudiIncDF1Skipmerge)
// make sure show() work
- hudiSnapshotDF1.show(1)
- hudiSnapshotDF2.show(1)
+ verifyShow(hudiSnapshotDF1)
+ verifyShow(hudiSnapshotDF2)
+ verifyShow(hudiIncDF1)
+ verifyShow(hudiIncDF2)
+ verifyShow(hudiIncDF1Skipmerge)
}
@Test
@@ -404,4 +502,25 @@ class TestMORDataSource extends HoodieClientTestBase {
hudiSnapshotDF1.show(1)
hudiSnapshotDF2.show(1)
}
+
+ def verifySchemaAndTypes(df: DataFrame): Unit = {
+ assertEquals("amount,currency,tip_history,_hoodie_commit_seqno",
+ df.select("fare.amount", "fare.currency", "tip_history",
"_hoodie_commit_seqno")
+ .orderBy(desc("_hoodie_commit_seqno"))
+ .columns.mkString(","))
+ val sampleRow = df
+ .select("begin_lat", "current_date", "fare.currency", "tip_history",
"nation")
+ .orderBy(desc("_hoodie_commit_time"))
+ .head()
+ assertEquals(sampleRow.getDouble(0), sampleRow.get(0))
+ assertEquals(sampleRow.getLong(1), sampleRow.get(1))
+ assertEquals(sampleRow.getString(2), sampleRow.get(2))
+ assertEquals(sampleRow.getSeq(3), sampleRow.get(3))
+ assertEquals(sampleRow.getStruct(4), sampleRow.get(4))
+ }
+
+ def verifyShow(df: DataFrame): Unit = {
+ df.show(1)
+ df.select("_hoodie_commit_seqno", "fare.amount", "fare.currency",
"tip_history").show(1)
+ }
}