nsivabalan commented on a change in pull request #3946:
URL: https://github.com/apache/hudi/pull/3946#discussion_r773495356



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##########
@@ -155,27 +155,60 @@ class IncrementalRelation(val sqlContext: SQLContext,
     if (filteredRegularFullPaths.isEmpty && 
filteredMetaBootstrapFullPaths.isEmpty) {
       sqlContext.sparkContext.emptyRDD[Row]
     } else {
-      log.info("Additional Filters to be applied to incremental source are :" 
+ filters)
+      log.info("Additional Filters to be applied to incremental source are :" 
+ filters.mkString("Array(", ", ", ")"))
 
       var df: DataFrame = 
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
 
-      if (metaBootstrapFileIdToFullPath.nonEmpty) {
-        df = sqlContext.sparkSession.read
-               .format("hudi")
-               .schema(usedSchema)
-               .option(DataSourceReadOptions.READ_PATHS.key, 
filteredMetaBootstrapFullPaths.mkString(","))
-               .load()
+      val fullTableScanFallback = 
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
+        
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
+
+      var doFullTableScan = false
+
+      if (fullTableScanFallback) {
+        val fs = new 
Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
+        val timer = new HoodieTimer().startTimer();
+
+        val allFilesToCheck = filteredMetaBootstrapFullPaths ++ 
filteredRegularFullPaths
+        val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new 
Path(path)))
+        val timeTaken = timer.endTimer()
+        log.info("Checking if paths exists took " + timeTaken + "ms")
+
+        val optStartTs = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
+        val isInstantArchived = 
optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 // 
True if optStartTs < activeTimeline.first

Review comment:
       I revisited this patch. I get it now. 
   So, we are fixing two things. 
   1: a commit is valid in active timeline, but corresponding data files are 
cleaned up.
   2: begin commit is archived. 
   Makes sense to me. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##########
@@ -155,27 +155,60 @@ class IncrementalRelation(val sqlContext: SQLContext,
     if (filteredRegularFullPaths.isEmpty && 
filteredMetaBootstrapFullPaths.isEmpty) {
       sqlContext.sparkContext.emptyRDD[Row]
     } else {
-      log.info("Additional Filters to be applied to incremental source are :" 
+ filters)
+      log.info("Additional Filters to be applied to incremental source are :" 
+ filters.mkString("Array(", ", ", ")"))
 
       var df: DataFrame = 
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
 
-      if (metaBootstrapFileIdToFullPath.nonEmpty) {
-        df = sqlContext.sparkSession.read
-               .format("hudi")
-               .schema(usedSchema)
-               .option(DataSourceReadOptions.READ_PATHS.key, 
filteredMetaBootstrapFullPaths.mkString(","))
-               .load()
+      val fullTableScanFallback = 
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
+        
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
+
+      var doFullTableScan = false
+
+      if (fullTableScanFallback) {
+        val fs = new 
Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
+        val timer = new HoodieTimer().startTimer();
+
+        val allFilesToCheck = filteredMetaBootstrapFullPaths ++ 
filteredRegularFullPaths
+        val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new 
Path(path)))

Review comment:
       fs.isExists() should be routed to metadata table. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##########
@@ -155,27 +155,60 @@ class IncrementalRelation(val sqlContext: SQLContext,
     if (filteredRegularFullPaths.isEmpty && 
filteredMetaBootstrapFullPaths.isEmpty) {
       sqlContext.sparkContext.emptyRDD[Row]
     } else {
-      log.info("Additional Filters to be applied to incremental source are :" 
+ filters)
+      log.info("Additional Filters to be applied to incremental source are :" 
+ filters.mkString("Array(", ", ", ")"))
 
       var df: DataFrame = 
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
 
-      if (metaBootstrapFileIdToFullPath.nonEmpty) {
-        df = sqlContext.sparkSession.read
-               .format("hudi")
-               .schema(usedSchema)
-               .option(DataSourceReadOptions.READ_PATHS.key, 
filteredMetaBootstrapFullPaths.mkString(","))
-               .load()
+      val fullTableScanFallback = 
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
+        
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
+
+      var doFullTableScan = false
+
+      if (fullTableScanFallback) {
+        val fs = new 
Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
+        val timer = new HoodieTimer().startTimer();
+
+        val allFilesToCheck = filteredMetaBootstrapFullPaths ++ 
filteredRegularFullPaths
+        val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new 
Path(path)))
+        val timeTaken = timer.endTimer()
+        log.info("Checking if paths exists took " + timeTaken + "ms")
+
+        val optStartTs = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
+        val isInstantArchived = 
optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 // 
True if optStartTs < activeTimeline.first
+
+        if (isInstantArchived || firstNotFoundPath.isDefined) {
+          doFullTableScan = true
+          log.info("Falling back to full table scan")
+        }
       }
 
-      if (regularFileIdToFullPath.nonEmpty)
-      {
-        df = df.union(sqlContext.read.options(sOpts)
-                        .schema(usedSchema)
-                        .parquet(filteredRegularFullPaths.toList: _*)
-                        .filter(String.format("%s >= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-                          commitsToReturn.head.getTimestamp))
-                        .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-                          commitsToReturn.last.getTimestamp)))
+      if (doFullTableScan) {
+        df = df.union(sqlContext.read
+          .schema(usedSchema)
+          .format("hudi")
+          .load(basePath)
+          .filter(String.format("%s > '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because 
we are working with optParam instead of first commit > optParam

Review comment:
       Can you help me understand how does this work? 
   Lets take the example in the tests added. 
   C0   C1   C2  C3 |    C4   C5   |   C6    C7   C8   C9
   
   C0 to C3 is archived. 
   C4 and C4 are cleaned. 
   Active timeline: C4 to C9.
   
   If someone tries incremental query with C4 and C5 as begin and end, 
   do we do full scan of table for records with commit time > C4 and <= C5? 
   Whats the checkpoint returned at the end ?  Is it C5 so that next time the 
caller will make incremental query with begin time C5? 
   So, in this case, if records pertaining to C4 and C5 have been updated by 
future commits, we may return empty df is it ?
   

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##########
@@ -155,27 +155,60 @@ class IncrementalRelation(val sqlContext: SQLContext,
     if (filteredRegularFullPaths.isEmpty && 
filteredMetaBootstrapFullPaths.isEmpty) {
       sqlContext.sparkContext.emptyRDD[Row]
     } else {
-      log.info("Additional Filters to be applied to incremental source are :" 
+ filters)
+      log.info("Additional Filters to be applied to incremental source are :" 
+ filters.mkString("Array(", ", ", ")"))
 
       var df: DataFrame = 
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
 
-      if (metaBootstrapFileIdToFullPath.nonEmpty) {
-        df = sqlContext.sparkSession.read
-               .format("hudi")
-               .schema(usedSchema)
-               .option(DataSourceReadOptions.READ_PATHS.key, 
filteredMetaBootstrapFullPaths.mkString(","))
-               .load()
+      val fullTableScanFallback = 
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
+        
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
+
+      var doFullTableScan = false
+
+      if (fullTableScanFallback) {
+        val fs = new 
Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
+        val timer = new HoodieTimer().startTimer();
+
+        val allFilesToCheck = filteredMetaBootstrapFullPaths ++ 
filteredRegularFullPaths
+        val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new 
Path(path)))
+        val timeTaken = timer.endTimer()
+        log.info("Checking if paths exists took " + timeTaken + "ms")
+
+        val optStartTs = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
+        val isInstantArchived = 
optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 // 
True if optStartTs < activeTimeline.first
+
+        if (isInstantArchived || firstNotFoundPath.isDefined) {
+          doFullTableScan = true
+          log.info("Falling back to full table scan")
+        }
       }
 
-      if (regularFileIdToFullPath.nonEmpty)
-      {
-        df = df.union(sqlContext.read.options(sOpts)
-                        .schema(usedSchema)
-                        .parquet(filteredRegularFullPaths.toList: _*)
-                        .filter(String.format("%s >= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-                          commitsToReturn.head.getTimestamp))
-                        .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-                          commitsToReturn.last.getTimestamp)))
+      if (doFullTableScan) {
+        df = df.union(sqlContext.read
+          .schema(usedSchema)
+          .format("hudi")
+          .load(basePath)
+          .filter(String.format("%s > '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because 
we are working with optParam instead of first commit > optParam

Review comment:
       guess my question on incremental query checkpoint may not make sense. If 
consumer is a deltastreamer, it will keep track of commits consumed and will 
send back C5 for next round. The query as such may not return any explicit 
checkpoint. Correct me if my understanding is wrong. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to