This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 553e280c6d [HUDI-3189] Fallback to full table scan with incremental 
query when files are cleaned up or achived for MOR table (#6141)
553e280c6d is described below

commit 553e280c6d7f3cc349bf7718104a66f7d2866a9b
Author: RexAn <[email protected]>
AuthorDate: Mon Aug 15 14:30:10 2022 +0800

    [HUDI-3189] Fallback to full table scan with incremental query when files 
are cleaned up or achived for MOR table (#6141)
    
    * Spark support MOR read archived commits for incremental query
---
 .../org/apache/hudi/IncrementalRelation.scala      | 122 ++++++++------
 .../hudi/MergeOnReadIncrementalRelation.scala      |  68 ++++++--
 .../apache/hudi/functional/TestCOWDataSource.scala |  83 ---------
 .../TestIncrementalReadWithFullTableScan.scala     | 187 +++++++++++++++++++++
 4 files changed, 311 insertions(+), 149 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 81b12dbcb6..db48f224f2 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -71,6 +71,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
     throw new HoodieException(s"Specify the begin instant time to pull from 
using " +
       s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME.key}")
   }
+
   if (!metaClient.getTableConfig.populateMetaFields()) {
     throw new HoodieException("Incremental queries are not supported when meta 
fields are disabled")
   }
@@ -188,71 +189,90 @@ class IncrementalRelation(val sqlContext: SQLContext,
         case HoodieFileFormat.ORC => "orc"
       }
       
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
+
+      // Fallback to full table scan if any of the following conditions 
matches:
+      //   1. the start commit is archived
+      //   2. the end commit is archived
+      //   3. there are files in metadata be deleted
+      val fallbackToFullTableScan = 
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
+        
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
+
       val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
-      if (filteredRegularFullPaths.isEmpty && 
filteredMetaBootstrapFullPaths.isEmpty) {
-        sqlContext.sparkContext.emptyRDD[Row]
-      } else {
-        log.info("Additional Filters to be applied to incremental source are 
:" + filters.mkString("Array(", ", ", ")"))
 
-        var df: DataFrame = 
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
+      val startInstantTime = 
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
+      val startInstantArchived = 
commitTimeline.isBeforeTimelineStarts(startInstantTime)
+      val endInstantTime = 
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), 
lastInstant.getTimestamp)
+      val endInstantArchived = 
commitTimeline.isBeforeTimelineStarts(endInstantTime)
 
-        val fallbackToFullTableScan = 
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
-          
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
+      val scanDf = if (fallbackToFullTableScan && (startInstantArchived || 
endInstantArchived)) {
+        log.info(s"Falling back to full table scan as startInstantArchived: 
$startInstantArchived, endInstantArchived: $endInstantArchived")
+        fullTableScanDataFrame(startInstantTime, endInstantTime)
+      } else {
+        if (filteredRegularFullPaths.isEmpty && 
filteredMetaBootstrapFullPaths.isEmpty) {
+          sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], 
usedSchema)
+        } else {
+          log.info("Additional Filters to be applied to incremental source are 
:" + filters.mkString("Array(", ", ", ")"))
 
-        var doFullTableScan = false
+          var df: DataFrame = 
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
 
-        if (fallbackToFullTableScan) {
-          val fs = 
basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
-          val timer = new HoodieTimer().startTimer();
+          var doFullTableScan = false
 
-          val allFilesToCheck = filteredMetaBootstrapFullPaths ++ 
filteredRegularFullPaths
-          val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new 
Path(path)))
-          val timeTaken = timer.endTimer()
-          log.info("Checking if paths exists took " + timeTaken + "ms")
+          if (fallbackToFullTableScan) {
+            val fs = 
basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
+            val timer = new HoodieTimer().startTimer();
 
-          val optStartTs = 
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
-          val isInstantArchived = 
optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 // 
True if optStartTs < activeTimeline.first
+            val allFilesToCheck = filteredMetaBootstrapFullPaths ++ 
filteredRegularFullPaths
+            val firstNotFoundPath = allFilesToCheck.find(path => 
!fs.exists(new Path(path)))
+            val timeTaken = timer.endTimer()
+            log.info("Checking if paths exists took " + timeTaken + "ms")
 
-          if (isInstantArchived || firstNotFoundPath.isDefined) {
-            doFullTableScan = true
-            log.info("Falling back to full table scan")
+            if (firstNotFoundPath.isDefined) {
+              doFullTableScan = true
+              log.info("Falling back to full table scan as some files cannot 
be found.")
+            }
           }
-        }
 
-        if (doFullTableScan) {
-          val hudiDF = sqlContext.read
-            .format("hudi_v1")
-            .schema(usedSchema)
-            .load(basePath.toString)
-            .filter(String.format("%s > '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because 
we are working with optParam instead of first commit > optParam
-              optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)))
-            .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-              commitsToReturn.last.getTimestamp))
-          // schema enforcement does not happen in above spark.read with hudi. 
hence selecting explicitly w/ right column order
-          val fieldNames : Array[String] = df.schema.fields.map(field => 
field.name)
-          df = df.union(hudiDF.select(fieldNames.head, fieldNames.tail: _*))
-        } else {
-          if (metaBootstrapFileIdToFullPath.nonEmpty) {
-            df = sqlContext.sparkSession.read
-              .format("hudi_v1")
-              .schema(usedSchema)
-              .option(DataSourceReadOptions.READ_PATHS.key, 
filteredMetaBootstrapFullPaths.mkString(","))
-              .load()
-          }
+          if (doFullTableScan) {
+            fullTableScanDataFrame(startInstantTime, endInstantTime)
+          } else {
+            if (metaBootstrapFileIdToFullPath.nonEmpty) {
+              df = sqlContext.sparkSession.read
+                .format("hudi_v1")
+                .schema(usedSchema)
+                .option(DataSourceReadOptions.READ_PATHS.key, 
filteredMetaBootstrapFullPaths.mkString(","))
+                .load()
+            }
 
-          if (regularFileIdToFullPath.nonEmpty) {
-            df = df.union(sqlContext.read.options(sOpts)
-              .schema(usedSchema).format(formatClassName)
-              .load(filteredRegularFullPaths.toList: _*)
-              .filter(String.format("%s >= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-                commitsToReturn.head.getTimestamp))
-              .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-                commitsToReturn.last.getTimestamp)))
+            if (regularFileIdToFullPath.nonEmpty) {
+              df = df.union(sqlContext.read.options(sOpts)
+                .schema(usedSchema).format(formatClassName)
+                .load(filteredRegularFullPaths.toList: _*)
+                .filter(String.format("%s >= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+                  commitsToReturn.head.getTimestamp))
+                .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+                  commitsToReturn.last.getTimestamp)))
+            }
+            df
           }
         }
-
-        filters.foldLeft(df)((e, f) => e.filter(f)).rdd
       }
+
+      filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd
     }
   }
+
+  private def fullTableScanDataFrame(startInstantTime: String, endInstantTime: 
String): DataFrame = {
+    val hudiDF = sqlContext.read
+      .format("hudi_v1")
+      .schema(usedSchema)
+      .load(basePath.toString)
+      .filter(String.format("%s > '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because 
we are working with optParam instead of first commit > optParam
+        startInstantTime))
+      .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+        endInstantTime))
+
+    // schema enforcement does not happen in above spark.read with hudi. hence 
selecting explicitly w/ right column order
+    val fieldNames = usedSchema.fieldNames
+    hudiDF.select(fieldNames.head, fieldNames.tail: _*)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 0fc6ef2f83..446c806b18 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -17,7 +17,7 @@
 
 package org.apache.hudi
 
-import org.apache.hadoop.fs.{GlobPattern, Path}
+import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -53,9 +53,11 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
   }
 
   override protected def timeline: HoodieTimeline = {
-    val startTimestamp = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
-    val endTimestamp = 
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, 
super.timeline.lastInstant().get.getTimestamp)
-    super.timeline.findInstantsInRange(startTimestamp, endTimestamp)
+    if (fullTableScan) {
+      super.timeline
+    } else {
+      super.timeline.findInstantsInRange(startTimestamp, endTimestamp)
+    }
   }
 
   protected override def composeRDD(fileSplits: 
Seq[HoodieMergeOnReadFileSplit],
@@ -87,17 +89,19 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
     if (includedCommits.isEmpty) {
       List()
     } else {
-      val latestCommit = includedCommits.last.getTimestamp
-      val commitsMetadata = includedCommits.map(getCommitMetadata(_, 
timeline)).asJava
+      val fileSlices = if (fullTableScan) {
+        listLatestFileSlices(Seq(), partitionFilters, dataFilters)
+      } else {
+        val latestCommit = includedCommits.last.getTimestamp
 
-      val modifiedFiles = listAffectedFilesForCommits(conf, new 
Path(metaClient.getBasePath), commitsMetadata)
-      val fsView = new HoodieTableFileSystemView(metaClient, timeline, 
modifiedFiles)
+        val fsView = new HoodieTableFileSystemView(metaClient, timeline, 
affectedFilesInCommits)
 
-      val modifiedPartitions = getWritePartitionPaths(commitsMetadata)
+        val modifiedPartitions = getWritePartitionPaths(commitsMetadata)
 
-      val fileSlices = modifiedPartitions.asScala.flatMap { 
relativePartitionPath =>
-        fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, 
latestCommit).iterator().asScala
-      }.toSeq
+        modifiedPartitions.asScala.flatMap { relativePartitionPath =>
+          fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, 
latestCommit).iterator().asScala
+        }.toSeq
+      }
 
       buildSplits(filterFileSlices(fileSlices, globPattern))
     }
@@ -124,14 +128,48 @@ trait HoodieIncrementalRelationTrait extends 
HoodieBaseRelation {
   // Validate this Incremental implementation is properly configured
   validate()
 
-  protected lazy val includedCommits: immutable.Seq[HoodieInstant] = 
timeline.getInstants.iterator().asScala.toList
+  protected def startTimestamp: String = 
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
+  protected def endTimestamp: String = 
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, 
super.timeline.lastInstant().get.getTimestamp)
+
+  protected def startInstantArchived: Boolean = 
super.timeline.isBeforeTimelineStarts(startTimestamp)
+  protected def endInstantArchived: Boolean = 
super.timeline.isBeforeTimelineStarts(endTimestamp)
+
+  // Fallback to full table scan if any of the following conditions matches:
+  //   1. the start commit is archived
+  //   2. the end commit is archived
+  //   3. there are files in metadata be deleted
+  protected lazy val fullTableScan: Boolean = {
+    val fallbackToFullTableScan = 
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
+      
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
+
+    fallbackToFullTableScan && (startInstantArchived || endInstantArchived || 
affectedFilesInCommits.exists(fileStatus => 
!metaClient.getFs.exists(fileStatus.getPath)))
+  }
+
+  protected lazy val includedCommits: immutable.Seq[HoodieInstant] = {
+    if (!startInstantArchived || !endInstantArchived) {
+      // If endTimestamp commit is not archived, will filter instants
+      // before endTimestamp.
+      super.timeline.findInstantsInRange(startTimestamp, 
endTimestamp).getInstants.iterator().asScala.toList
+    } else {
+      super.timeline.getInstants.iterator().asScala.toList
+    }
+  }
+
+  protected lazy val commitsMetadata = 
includedCommits.map(getCommitMetadata(_, super.timeline)).asJava
+
+  protected lazy val affectedFilesInCommits: Array[FileStatus] = {
+    listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), 
commitsMetadata)
+  }
 
   // Record filters making sure that only records w/in the requested bounds 
are being fetched as part of the
   // scan collected by this relation
   protected lazy val incrementalSpanRecordFilters: Seq[Filter] = {
     val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
-    val largerThanFilter = 
GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
includedCommits.head.getTimestamp)
-    val lessThanFilter = 
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
includedCommits.last.getTimestamp)
+
+    val largerThanFilter = 
GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTimestamp)
+
+    val lessThanFilter = 
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+      if (endInstantArchived) endTimestamp else 
includedCommits.last.getTimestamp)
 
     Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index c7eef5bce4..6697ec1514 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -859,89 +859,6 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
   }
 
-  @Test def testFailEarlyForIncrViewQueryForNonExistingFiles(): Unit = {
-    // Create 10 commits
-    for (i <- 1 to 10) {
-      val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 
100)).toList
-      val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
-      inputDF.write.format("org.apache.hudi")
-        .options(commonOpts)
-        .option("hoodie.cleaner.commits.retained", "3")
-        .option("hoodie.keep.min.commits", "4")
-        .option("hoodie.keep.max.commits", "5")
-        .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
-        .mode(SaveMode.Append)
-        .save(basePath)
-    }
-
-    val hoodieMetaClient = 
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build()
-    /**
-      * State of timeline after 10 commits
-      * +------------------+--------------------------------------+
-      * |     Archived     |            Active Timeline           |
-      * +------------------+--------------+-----------------------+
-      * | C0   C1   C2  C3 |    C4   C5   |   C6    C7   C8   C9  |
-      * +------------------+--------------+-----------------------+
-      * |          Data cleaned           |  Data exists in table |
-      * +---------------------------------+-----------------------+
-      */
-
-    val completedCommits = 
hoodieMetaClient.getCommitsTimeline.filterCompletedInstants() // C4 to C9
-    //Anything less than 2 is a valid commit in the sense no cleanup has been 
done for those commit files
-    var startTs = completedCommits.nthInstant(0).get().getTimestamp //C4
-    var endTs = completedCommits.nthInstant(1).get().getTimestamp //C5
-
-    //Calling without the fallback should result in Path does not exist
-    var hoodieIncViewDF = spark.read.format("org.apache.hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
-      .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
-      .load(basePath)
-
-    val msg = "Should fail with Path does not exist"
-    assertThrows(classOf[AnalysisException], new Executable {
-      override def execute(): Unit = {
-        hoodieIncViewDF.count()
-      }
-    }, msg)
-
-    //Should work with fallback enabled
-    hoodieIncViewDF = spark.read.format("org.apache.hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
-      .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
-      
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(),
 "true")
-      .load(basePath)
-    assertEquals(100, hoodieIncViewDF.count())
-
-    //Test out for archived commits
-    val archivedInstants = 
hoodieMetaClient.getArchivedTimeline.getInstants.distinct().toArray
-    startTs = archivedInstants(0).asInstanceOf[HoodieInstant].getTimestamp //C0
-    endTs = completedCommits.nthInstant(1).get().getTimestamp //C5
-
-    //Calling without the fallback should result in Path does not exist
-    hoodieIncViewDF = spark.read.format("org.apache.hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
-      .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
-      .load(basePath)
-
-    assertThrows(classOf[AnalysisException], new Executable {
-      override def execute(): Unit = {
-        hoodieIncViewDF.count()
-      }
-    }, msg)
-
-    //Should work with fallback enabled
-    hoodieIncViewDF = spark.read.format("org.apache.hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
-      .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
-      
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(),
 "true")
-      .load(basePath)
-    assertEquals(500, hoodieIncViewDF.count())
-  }
-
   @Test
   def testWriteSmallPrecisionDecimalTable(): Unit = {
     val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
new file mode 100644
index 0000000000..fb0cf5e179
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, 
HoodieInstantTimeGenerator, HoodieTimeline}
+import org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.log4j.LogManager
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.function.Executable
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import scala.collection.JavaConversions.asScalaBuffer
+
+class TestIncrementalReadWithFullTableScan extends HoodieClientTestBase {
+
+  var spark: SparkSession = null
+  private val log = 
LogManager.getLogger(classOf[TestIncrementalReadWithFullTableScan])
+
+  private val perBatchSize = 100
+
+  val commonOpts = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+    DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+    DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+    HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
+  )
+
+
+  val verificationCol: String = "driver"
+  val updatedVerificationVal: String = "driver_update"
+
+  @BeforeEach override def setUp() {
+    setTableName("hoodie_test")
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+    initTestDataGenerator()
+    initFileSystem()
+  }
+
+  @AfterEach override def tearDown() = {
+    cleanupSparkContexts()
+    cleanupTestDataGenerator()
+    cleanupFileSystem()
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testFailEarlyForIncrViewQueryForNonExistingFiles(tableType: 
HoodieTableType): Unit = {
+    // Create 10 commits
+    for (i <- 1 to 10) {
+      val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 
perBatchSize)).toList
+      val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+      inputDF.write.format("org.apache.hudi")
+        .options(commonOpts)
+        .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
+        .option("hoodie.cleaner.commits.retained", "3")
+        .option("hoodie.keep.min.commits", "4")
+        .option("hoodie.keep.max.commits", "5")
+        .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+        .mode(SaveMode.Append)
+        .save(basePath)
+    }
+
+    val hoodieMetaClient = 
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build()
+    /**
+     * State of timeline after 10 commits
+     * +------------------+--------------------------------------+
+     * |     Archived     |            Active Timeline           |
+     * +------------------+--------------+-----------------------+
+     * | C0   C1   C2  C3 |    C4   C5   |   C6    C7   C8   C9  |
+     * +------------------+--------------+-----------------------+
+     * |          Data cleaned           |  Data exists in table |
+     * +---------------------------------+-----------------------+
+     */
+
+    val completedCommits = 
hoodieMetaClient.getCommitsTimeline.filterCompletedInstants() // C4 to C9
+    val archivedInstants = 
hoodieMetaClient.getArchivedTimeline.filterCompletedInstants()
+      .getInstants.distinct().toArray // C0 to C3
+
+    //Anything less than 2 is a valid commit in the sense no cleanup has been 
done for those commit files
+    val startUnarchivedCommitTs = 
completedCommits.nthInstant(0).get().getTimestamp //C4
+    val endUnarchivedCommitTs = 
completedCommits.nthInstant(1).get().getTimestamp //C5
+
+    val startArchivedCommitTs = 
archivedInstants(0).asInstanceOf[HoodieInstant].getTimestamp //C0
+    val endArchivedCommitTs = 
archivedInstants(1).asInstanceOf[HoodieInstant].getTimestamp //C1
+
+    val startOutOfRangeCommitTs = 
HoodieInstantTimeGenerator.createNewInstantTime(0)
+    val endOutOfRangeCommitTs = 
HoodieInstantTimeGenerator.createNewInstantTime(0)
+
+    assertTrue(HoodieTimeline.compareTimestamps(startOutOfRangeCommitTs, 
GREATER_THAN, completedCommits.lastInstant().get().getTimestamp))
+    assertTrue(HoodieTimeline.compareTimestamps(endOutOfRangeCommitTs, 
GREATER_THAN, completedCommits.lastInstant().get().getTimestamp))
+
+    // Test both start and end commits are archived
+    runIncrementalQueryAndCompare(startArchivedCommitTs, endArchivedCommitTs, 
1, true)
+
+    // Test start commit is archived, end commit is not archived
+    shouldThrowIfFallbackIsFalse(tableType,
+      () => runIncrementalQueryAndCompare(startArchivedCommitTs, 
endUnarchivedCommitTs, 5, false))
+    runIncrementalQueryAndCompare(startArchivedCommitTs, 
endUnarchivedCommitTs, 5, true)
+
+    // Test both start commit and end commits are not archived but got cleaned
+    shouldThrowIfFallbackIsFalse(tableType,
+      () => runIncrementalQueryAndCompare(startUnarchivedCommitTs, 
endUnarchivedCommitTs, 1, false))
+    runIncrementalQueryAndCompare(startUnarchivedCommitTs, 
endUnarchivedCommitTs, 1, true)
+
+    // Test start commit is not archived, end commits is out of the timeline
+    runIncrementalQueryAndCompare(startUnarchivedCommitTs, 
endOutOfRangeCommitTs, 5, true)
+
+    // Test both start commit and end commits are out of the timeline
+    runIncrementalQueryAndCompare(startOutOfRangeCommitTs, 
endOutOfRangeCommitTs, 0, false)
+    runIncrementalQueryAndCompare(startOutOfRangeCommitTs, 
endOutOfRangeCommitTs, 0, true)
+
+    // Test end commit is smaller than the start commit
+    runIncrementalQueryAndCompare(endUnarchivedCommitTs, 
startUnarchivedCommitTs, 0, false)
+    runIncrementalQueryAndCompare(endUnarchivedCommitTs, 
startUnarchivedCommitTs, 0, true)
+
+    // Test both start commit and end commits is not archived and not cleaned
+    val reversedCommits = completedCommits.getReverseOrderedInstants.toArray
+    val startUncleanedCommitTs = 
reversedCommits.apply(1).asInstanceOf[HoodieInstant].getTimestamp
+    val endUncleanedCommitTs = 
reversedCommits.apply(0).asInstanceOf[HoodieInstant].getTimestamp
+    runIncrementalQueryAndCompare(startUncleanedCommitTs, 
endUncleanedCommitTs, 1, true)
+    runIncrementalQueryAndCompare(startUncleanedCommitTs, 
endUncleanedCommitTs, 1, false)
+  }
+
+  private def runIncrementalQueryAndCompare(
+      startTs: String,
+      endTs: String,
+      batchNum: Int,
+      fallBackFullTableScan: Boolean): Unit = {
+    val hoodieIncViewDF = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
+      .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
+      
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(),
 fallBackFullTableScan)
+      .load(basePath)
+    assertEquals(perBatchSize * batchNum, hoodieIncViewDF.count())
+  }
+
+  private def shouldThrowIfFallbackIsFalse(tableType: HoodieTableType, fn: () 
=> Unit): Unit = {
+    val msg = "Should fail with Path does not exist"
+    tableType match {
+      case HoodieTableType.COPY_ON_WRITE =>
+        assertThrows(classOf[AnalysisException], new Executable {
+          override def execute(): Unit = {
+            fn()
+          }
+        }, msg)
+      case HoodieTableType.MERGE_ON_READ =>
+        val exp = assertThrows(classOf[SparkException], new Executable {
+          override def execute(): Unit = {
+            fn()
+          }
+        }, msg)
+        assertTrue(exp.getMessage.contains("FileNotFoundException"))
+    }
+  }
+}

Reply via email to