This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 553e280c6d [HUDI-3189] Fallback to full table scan with incremental
query when files are cleaned up or achived for MOR table (#6141)
553e280c6d is described below
commit 553e280c6d7f3cc349bf7718104a66f7d2866a9b
Author: RexAn <[email protected]>
AuthorDate: Mon Aug 15 14:30:10 2022 +0800
[HUDI-3189] Fallback to full table scan with incremental query when files
are cleaned up or achived for MOR table (#6141)
* Spark support MOR read archived commits for incremental query
---
.../org/apache/hudi/IncrementalRelation.scala | 122 ++++++++------
.../hudi/MergeOnReadIncrementalRelation.scala | 68 ++++++--
.../apache/hudi/functional/TestCOWDataSource.scala | 83 ---------
.../TestIncrementalReadWithFullTableScan.scala | 187 +++++++++++++++++++++
4 files changed, 311 insertions(+), 149 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 81b12dbcb6..db48f224f2 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -71,6 +71,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
throw new HoodieException(s"Specify the begin instant time to pull from
using " +
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME.key}")
}
+
if (!metaClient.getTableConfig.populateMetaFields()) {
throw new HoodieException("Incremental queries are not supported when meta
fields are disabled")
}
@@ -188,71 +189,90 @@ class IncrementalRelation(val sqlContext: SQLContext,
case HoodieFileFormat.ORC => "orc"
}
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
+
+ // Fallback to full table scan if any of the following conditions
matches:
+ // 1. the start commit is archived
+ // 2. the end commit is archived
+ // 3. there are files in metadata be deleted
+ val fallbackToFullTableScan =
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
+
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
+
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
- if (filteredRegularFullPaths.isEmpty &&
filteredMetaBootstrapFullPaths.isEmpty) {
- sqlContext.sparkContext.emptyRDD[Row]
- } else {
- log.info("Additional Filters to be applied to incremental source are
:" + filters.mkString("Array(", ", ", ")"))
- var df: DataFrame =
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
+ val startInstantTime =
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
+ val startInstantArchived =
commitTimeline.isBeforeTimelineStarts(startInstantTime)
+ val endInstantTime =
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(),
lastInstant.getTimestamp)
+ val endInstantArchived =
commitTimeline.isBeforeTimelineStarts(endInstantTime)
- val fallbackToFullTableScan =
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
-
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
+ val scanDf = if (fallbackToFullTableScan && (startInstantArchived ||
endInstantArchived)) {
+ log.info(s"Falling back to full table scan as startInstantArchived:
$startInstantArchived, endInstantArchived: $endInstantArchived")
+ fullTableScanDataFrame(startInstantTime, endInstantTime)
+ } else {
+ if (filteredRegularFullPaths.isEmpty &&
filteredMetaBootstrapFullPaths.isEmpty) {
+ sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row],
usedSchema)
+ } else {
+ log.info("Additional Filters to be applied to incremental source are
:" + filters.mkString("Array(", ", ", ")"))
- var doFullTableScan = false
+ var df: DataFrame =
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
- if (fallbackToFullTableScan) {
- val fs =
basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
- val timer = new HoodieTimer().startTimer();
+ var doFullTableScan = false
- val allFilesToCheck = filteredMetaBootstrapFullPaths ++
filteredRegularFullPaths
- val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new
Path(path)))
- val timeTaken = timer.endTimer()
- log.info("Checking if paths exists took " + timeTaken + "ms")
+ if (fallbackToFullTableScan) {
+ val fs =
basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
+ val timer = new HoodieTimer().startTimer();
- val optStartTs =
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
- val isInstantArchived =
optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 //
True if optStartTs < activeTimeline.first
+ val allFilesToCheck = filteredMetaBootstrapFullPaths ++
filteredRegularFullPaths
+ val firstNotFoundPath = allFilesToCheck.find(path =>
!fs.exists(new Path(path)))
+ val timeTaken = timer.endTimer()
+ log.info("Checking if paths exists took " + timeTaken + "ms")
- if (isInstantArchived || firstNotFoundPath.isDefined) {
- doFullTableScan = true
- log.info("Falling back to full table scan")
+ if (firstNotFoundPath.isDefined) {
+ doFullTableScan = true
+ log.info("Falling back to full table scan as some files cannot
be found.")
+ }
}
- }
- if (doFullTableScan) {
- val hudiDF = sqlContext.read
- .format("hudi_v1")
- .schema(usedSchema)
- .load(basePath.toString)
- .filter(String.format("%s > '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because
we are working with optParam instead of first commit > optParam
- optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)))
- .filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- commitsToReturn.last.getTimestamp))
- // schema enforcement does not happen in above spark.read with hudi.
hence selecting explicitly w/ right column order
- val fieldNames : Array[String] = df.schema.fields.map(field =>
field.name)
- df = df.union(hudiDF.select(fieldNames.head, fieldNames.tail: _*))
- } else {
- if (metaBootstrapFileIdToFullPath.nonEmpty) {
- df = sqlContext.sparkSession.read
- .format("hudi_v1")
- .schema(usedSchema)
- .option(DataSourceReadOptions.READ_PATHS.key,
filteredMetaBootstrapFullPaths.mkString(","))
- .load()
- }
+ if (doFullTableScan) {
+ fullTableScanDataFrame(startInstantTime, endInstantTime)
+ } else {
+ if (metaBootstrapFileIdToFullPath.nonEmpty) {
+ df = sqlContext.sparkSession.read
+ .format("hudi_v1")
+ .schema(usedSchema)
+ .option(DataSourceReadOptions.READ_PATHS.key,
filteredMetaBootstrapFullPaths.mkString(","))
+ .load()
+ }
- if (regularFileIdToFullPath.nonEmpty) {
- df = df.union(sqlContext.read.options(sOpts)
- .schema(usedSchema).format(formatClassName)
- .load(filteredRegularFullPaths.toList: _*)
- .filter(String.format("%s >= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- commitsToReturn.head.getTimestamp))
- .filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- commitsToReturn.last.getTimestamp)))
+ if (regularFileIdToFullPath.nonEmpty) {
+ df = df.union(sqlContext.read.options(sOpts)
+ .schema(usedSchema).format(formatClassName)
+ .load(filteredRegularFullPaths.toList: _*)
+ .filter(String.format("%s >= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ commitsToReturn.head.getTimestamp))
+ .filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ commitsToReturn.last.getTimestamp)))
+ }
+ df
}
}
-
- filters.foldLeft(df)((e, f) => e.filter(f)).rdd
}
+
+ filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd
}
}
+
+ private def fullTableScanDataFrame(startInstantTime: String, endInstantTime:
String): DataFrame = {
+ val hudiDF = sqlContext.read
+ .format("hudi_v1")
+ .schema(usedSchema)
+ .load(basePath.toString)
+ .filter(String.format("%s > '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because
we are working with optParam instead of first commit > optParam
+ startInstantTime))
+ .filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ endInstantTime))
+
+ // schema enforcement does not happen in above spark.read with hudi. hence
selecting explicitly w/ right column order
+ val fieldNames = usedSchema.fieldNames
+ hudiDF.select(fieldNames.head, fieldNames.tail: _*)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 0fc6ef2f83..446c806b18 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -17,7 +17,7 @@
package org.apache.hudi
-import org.apache.hadoop.fs.{GlobPattern, Path}
+import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -53,9 +53,11 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
}
override protected def timeline: HoodieTimeline = {
- val startTimestamp = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
- val endTimestamp =
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key,
super.timeline.lastInstant().get.getTimestamp)
- super.timeline.findInstantsInRange(startTimestamp, endTimestamp)
+ if (fullTableScan) {
+ super.timeline
+ } else {
+ super.timeline.findInstantsInRange(startTimestamp, endTimestamp)
+ }
}
protected override def composeRDD(fileSplits:
Seq[HoodieMergeOnReadFileSplit],
@@ -87,17 +89,19 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
if (includedCommits.isEmpty) {
List()
} else {
- val latestCommit = includedCommits.last.getTimestamp
- val commitsMetadata = includedCommits.map(getCommitMetadata(_,
timeline)).asJava
+ val fileSlices = if (fullTableScan) {
+ listLatestFileSlices(Seq(), partitionFilters, dataFilters)
+ } else {
+ val latestCommit = includedCommits.last.getTimestamp
- val modifiedFiles = listAffectedFilesForCommits(conf, new
Path(metaClient.getBasePath), commitsMetadata)
- val fsView = new HoodieTableFileSystemView(metaClient, timeline,
modifiedFiles)
+ val fsView = new HoodieTableFileSystemView(metaClient, timeline,
affectedFilesInCommits)
- val modifiedPartitions = getWritePartitionPaths(commitsMetadata)
+ val modifiedPartitions = getWritePartitionPaths(commitsMetadata)
- val fileSlices = modifiedPartitions.asScala.flatMap {
relativePartitionPath =>
- fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath,
latestCommit).iterator().asScala
- }.toSeq
+ modifiedPartitions.asScala.flatMap { relativePartitionPath =>
+ fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath,
latestCommit).iterator().asScala
+ }.toSeq
+ }
buildSplits(filterFileSlices(fileSlices, globPattern))
}
@@ -124,14 +128,48 @@ trait HoodieIncrementalRelationTrait extends
HoodieBaseRelation {
// Validate this Incremental implementation is properly configured
validate()
- protected lazy val includedCommits: immutable.Seq[HoodieInstant] =
timeline.getInstants.iterator().asScala.toList
+ protected def startTimestamp: String =
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
+ protected def endTimestamp: String =
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key,
super.timeline.lastInstant().get.getTimestamp)
+
+ protected def startInstantArchived: Boolean =
super.timeline.isBeforeTimelineStarts(startTimestamp)
+ protected def endInstantArchived: Boolean =
super.timeline.isBeforeTimelineStarts(endTimestamp)
+
+ // Fallback to full table scan if any of the following conditions matches:
+ // 1. the start commit is archived
+ // 2. the end commit is archived
+ // 3. there are files in metadata be deleted
+ protected lazy val fullTableScan: Boolean = {
+ val fallbackToFullTableScan =
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
+
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
+
+ fallbackToFullTableScan && (startInstantArchived || endInstantArchived ||
affectedFilesInCommits.exists(fileStatus =>
!metaClient.getFs.exists(fileStatus.getPath)))
+ }
+
+ protected lazy val includedCommits: immutable.Seq[HoodieInstant] = {
+ if (!startInstantArchived || !endInstantArchived) {
+ // If endTimestamp commit is not archived, will filter instants
+ // before endTimestamp.
+ super.timeline.findInstantsInRange(startTimestamp,
endTimestamp).getInstants.iterator().asScala.toList
+ } else {
+ super.timeline.getInstants.iterator().asScala.toList
+ }
+ }
+
+ protected lazy val commitsMetadata =
includedCommits.map(getCommitMetadata(_, super.timeline)).asJava
+
+ protected lazy val affectedFilesInCommits: Array[FileStatus] = {
+ listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath),
commitsMetadata)
+ }
// Record filters making sure that only records w/in the requested bounds
are being fetched as part of the
// scan collected by this relation
protected lazy val incrementalSpanRecordFilters: Seq[Filter] = {
val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
- val largerThanFilter =
GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
includedCommits.head.getTimestamp)
- val lessThanFilter =
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
includedCommits.last.getTimestamp)
+
+ val largerThanFilter =
GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTimestamp)
+
+ val lessThanFilter =
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ if (endInstantArchived) endTimestamp else
includedCommits.last.getTimestamp)
Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index c7eef5bce4..6697ec1514 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -859,89 +859,6 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
}
- @Test def testFailEarlyForIncrViewQueryForNonExistingFiles(): Unit = {
- // Create 10 commits
- for (i <- 1 to 10) {
- val records = recordsToStrings(dataGen.generateInserts("%05d".format(i),
100)).toList
- val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
- inputDF.write.format("org.apache.hudi")
- .options(commonOpts)
- .option("hoodie.cleaner.commits.retained", "3")
- .option("hoodie.keep.min.commits", "4")
- .option("hoodie.keep.max.commits", "5")
- .option(DataSourceWriteOptions.OPERATION.key(),
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
- .mode(SaveMode.Append)
- .save(basePath)
- }
-
- val hoodieMetaClient =
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build()
- /**
- * State of timeline after 10 commits
- * +------------------+--------------------------------------+
- * | Archived | Active Timeline |
- * +------------------+--------------+-----------------------+
- * | C0 C1 C2 C3 | C4 C5 | C6 C7 C8 C9 |
- * +------------------+--------------+-----------------------+
- * | Data cleaned | Data exists in table |
- * +---------------------------------+-----------------------+
- */
-
- val completedCommits =
hoodieMetaClient.getCommitsTimeline.filterCompletedInstants() // C4 to C9
- //Anything less than 2 is a valid commit in the sense no cleanup has been
done for those commit files
- var startTs = completedCommits.nthInstant(0).get().getTimestamp //C4
- var endTs = completedCommits.nthInstant(1).get().getTimestamp //C5
-
- //Calling without the fallback should result in Path does not exist
- var hoodieIncViewDF = spark.read.format("org.apache.hudi")
- .option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
- .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
- .load(basePath)
-
- val msg = "Should fail with Path does not exist"
- assertThrows(classOf[AnalysisException], new Executable {
- override def execute(): Unit = {
- hoodieIncViewDF.count()
- }
- }, msg)
-
- //Should work with fallback enabled
- hoodieIncViewDF = spark.read.format("org.apache.hudi")
- .option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
- .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
-
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(),
"true")
- .load(basePath)
- assertEquals(100, hoodieIncViewDF.count())
-
- //Test out for archived commits
- val archivedInstants =
hoodieMetaClient.getArchivedTimeline.getInstants.distinct().toArray
- startTs = archivedInstants(0).asInstanceOf[HoodieInstant].getTimestamp //C0
- endTs = completedCommits.nthInstant(1).get().getTimestamp //C5
-
- //Calling without the fallback should result in Path does not exist
- hoodieIncViewDF = spark.read.format("org.apache.hudi")
- .option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
- .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
- .load(basePath)
-
- assertThrows(classOf[AnalysisException], new Executable {
- override def execute(): Unit = {
- hoodieIncViewDF.count()
- }
- }, msg)
-
- //Should work with fallback enabled
- hoodieIncViewDF = spark.read.format("org.apache.hudi")
- .option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
- .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
-
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(),
"true")
- .load(basePath)
- assertEquals(500, hoodieIncViewDF.count())
- }
-
@Test
def testWriteSmallPrecisionDecimalTable(): Unit = {
val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
new file mode 100644
index 0000000000..fb0cf5e179
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant,
HoodieInstantTimeGenerator, HoodieTimeline}
+import org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.log4j.LogManager
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.function.Executable
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import scala.collection.JavaConversions.asScalaBuffer
+
+class TestIncrementalReadWithFullTableScan extends HoodieClientTestBase {
+
+ var spark: SparkSession = null
+ private val log =
LogManager.getLogger(classOf[TestIncrementalReadWithFullTableScan])
+
+ private val perBatchSize = 100
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
+ )
+
+
+ val verificationCol: String = "driver"
+ val updatedVerificationVal: String = "driver_update"
+
+ @BeforeEach override def setUp() {
+ setTableName("hoodie_test")
+ initPath()
+ initSparkContexts()
+ spark = sqlContext.sparkSession
+ initTestDataGenerator()
+ initFileSystem()
+ }
+
+ @AfterEach override def tearDown() = {
+ cleanupSparkContexts()
+ cleanupTestDataGenerator()
+ cleanupFileSystem()
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testFailEarlyForIncrViewQueryForNonExistingFiles(tableType:
HoodieTableType): Unit = {
+ // Create 10 commits
+ for (i <- 1 to 10) {
+ val records = recordsToStrings(dataGen.generateInserts("%05d".format(i),
perBatchSize)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
+ .option("hoodie.cleaner.commits.retained", "3")
+ .option("hoodie.keep.min.commits", "4")
+ .option("hoodie.keep.max.commits", "5")
+ .option(DataSourceWriteOptions.OPERATION.key(),
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ }
+
+ val hoodieMetaClient =
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build()
+ /**
+ * State of timeline after 10 commits
+ * +------------------+--------------------------------------+
+ * | Archived | Active Timeline |
+ * +------------------+--------------+-----------------------+
+ * | C0 C1 C2 C3 | C4 C5 | C6 C7 C8 C9 |
+ * +------------------+--------------+-----------------------+
+ * | Data cleaned | Data exists in table |
+ * +---------------------------------+-----------------------+
+ */
+
+ val completedCommits =
hoodieMetaClient.getCommitsTimeline.filterCompletedInstants() // C4 to C9
+ val archivedInstants =
hoodieMetaClient.getArchivedTimeline.filterCompletedInstants()
+ .getInstants.distinct().toArray // C0 to C3
+
+ //Anything less than 2 is a valid commit in the sense no cleanup has been
done for those commit files
+ val startUnarchivedCommitTs =
completedCommits.nthInstant(0).get().getTimestamp //C4
+ val endUnarchivedCommitTs =
completedCommits.nthInstant(1).get().getTimestamp //C5
+
+ val startArchivedCommitTs =
archivedInstants(0).asInstanceOf[HoodieInstant].getTimestamp //C0
+ val endArchivedCommitTs =
archivedInstants(1).asInstanceOf[HoodieInstant].getTimestamp //C1
+
+ val startOutOfRangeCommitTs =
HoodieInstantTimeGenerator.createNewInstantTime(0)
+ val endOutOfRangeCommitTs =
HoodieInstantTimeGenerator.createNewInstantTime(0)
+
+ assertTrue(HoodieTimeline.compareTimestamps(startOutOfRangeCommitTs,
GREATER_THAN, completedCommits.lastInstant().get().getTimestamp))
+ assertTrue(HoodieTimeline.compareTimestamps(endOutOfRangeCommitTs,
GREATER_THAN, completedCommits.lastInstant().get().getTimestamp))
+
+ // Test both start and end commits are archived
+ runIncrementalQueryAndCompare(startArchivedCommitTs, endArchivedCommitTs,
1, true)
+
+ // Test start commit is archived, end commit is not archived
+ shouldThrowIfFallbackIsFalse(tableType,
+ () => runIncrementalQueryAndCompare(startArchivedCommitTs,
endUnarchivedCommitTs, 5, false))
+ runIncrementalQueryAndCompare(startArchivedCommitTs,
endUnarchivedCommitTs, 5, true)
+
+ // Test both start commit and end commits are not archived but got cleaned
+ shouldThrowIfFallbackIsFalse(tableType,
+ () => runIncrementalQueryAndCompare(startUnarchivedCommitTs,
endUnarchivedCommitTs, 1, false))
+ runIncrementalQueryAndCompare(startUnarchivedCommitTs,
endUnarchivedCommitTs, 1, true)
+
+ // Test start commit is not archived, end commits is out of the timeline
+ runIncrementalQueryAndCompare(startUnarchivedCommitTs,
endOutOfRangeCommitTs, 5, true)
+
+ // Test both start commit and end commits are out of the timeline
+ runIncrementalQueryAndCompare(startOutOfRangeCommitTs,
endOutOfRangeCommitTs, 0, false)
+ runIncrementalQueryAndCompare(startOutOfRangeCommitTs,
endOutOfRangeCommitTs, 0, true)
+
+ // Test end commit is smaller than the start commit
+ runIncrementalQueryAndCompare(endUnarchivedCommitTs,
startUnarchivedCommitTs, 0, false)
+ runIncrementalQueryAndCompare(endUnarchivedCommitTs,
startUnarchivedCommitTs, 0, true)
+
+ // Test both start commit and end commits is not archived and not cleaned
+ val reversedCommits = completedCommits.getReverseOrderedInstants.toArray
+ val startUncleanedCommitTs =
reversedCommits.apply(1).asInstanceOf[HoodieInstant].getTimestamp
+ val endUncleanedCommitTs =
reversedCommits.apply(0).asInstanceOf[HoodieInstant].getTimestamp
+ runIncrementalQueryAndCompare(startUncleanedCommitTs,
endUncleanedCommitTs, 1, true)
+ runIncrementalQueryAndCompare(startUncleanedCommitTs,
endUncleanedCommitTs, 1, false)
+ }
+
+ private def runIncrementalQueryAndCompare(
+ startTs: String,
+ endTs: String,
+ batchNum: Int,
+ fallBackFullTableScan: Boolean): Unit = {
+ val hoodieIncViewDF = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
+ .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
+
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(),
fallBackFullTableScan)
+ .load(basePath)
+ assertEquals(perBatchSize * batchNum, hoodieIncViewDF.count())
+ }
+
+ private def shouldThrowIfFallbackIsFalse(tableType: HoodieTableType, fn: ()
=> Unit): Unit = {
+ val msg = "Should fail with Path does not exist"
+ tableType match {
+ case HoodieTableType.COPY_ON_WRITE =>
+ assertThrows(classOf[AnalysisException], new Executable {
+ override def execute(): Unit = {
+ fn()
+ }
+ }, msg)
+ case HoodieTableType.MERGE_ON_READ =>
+ val exp = assertThrows(classOf[SparkException], new Executable {
+ override def execute(): Unit = {
+ fn()
+ }
+ }, msg)
+ assertTrue(exp.getMessage.contains("FileNotFoundException"))
+ }
+ }
+}