nsivabalan commented on a change in pull request #3946:
URL: https://github.com/apache/hudi/pull/3946#discussion_r763652433



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##########
@@ -155,27 +155,60 @@ class IncrementalRelation(val sqlContext: SQLContext,
     if (filteredRegularFullPaths.isEmpty && 
filteredMetaBootstrapFullPaths.isEmpty) {
       sqlContext.sparkContext.emptyRDD[Row]
     } else {
-      log.info("Additional Filters to be applied to incremental source are :" 
+ filters)
+      log.info("Additional Filters to be applied to incremental source are :" 
+ filters.mkString("Array(", ", ", ")"))
 
       var df: DataFrame = 
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
 
-      if (metaBootstrapFileIdToFullPath.nonEmpty) {
-        df = sqlContext.sparkSession.read
-               .format("hudi")
-               .schema(usedSchema)
-               .option(DataSourceReadOptions.READ_PATHS.key, 
filteredMetaBootstrapFullPaths.mkString(","))
-               .load()
+      val fullTableScanFallback = 
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
+        
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
+
+      var doFullTableScan = false
+
+      if (fullTableScanFallback) {
+        val fs = new 
Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
+        val timer = new HoodieTimer().startTimer();
+
+        val allFilesToCheck = filteredMetaBootstrapFullPaths ++ 
filteredRegularFullPaths
+        val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new 
Path(path)))
+        val timeTaken = timer.endTimer()
+        log.info("Checking if paths exists took " + timeTaken + "ms")
+
+        val optStartTs = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
+        val isInstantArchived = 
optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 // 
True if optStartTs < activeTimeline.first

Review comment:
       From a user standpoint, I would expect we should fallback to first valid 
commit in active timeline which cleaner has not cleaned up. But guess from an 
impl standpoint, we can't find this commit that easily. And so is the rational 
to fallback to snapshot query? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to