umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r447563017



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##########
@@ -92,36 +102,69 @@ class IncrementalRelation(val sqlContext: SQLContext,
   override def schema: StructType = latestSchema
 
   override def buildScan(): RDD[Row] = {
-    val fileIdToFullPath = mutable.HashMap[String, String]()
+    val regularFileIdToFullPath = mutable.HashMap[String, String]()
+    var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
+
     for (commit <- commitsToReturn) {
       val metadata: HoodieCommitMetadata = 
HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
         .get, classOf[HoodieCommitMetadata])
-      fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
+
+      if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) 
{
+        metaBootstrapFileIdToFullPath ++= 
metadata.getFileIdAndFullPaths(basePath).toMap
+      } else {
+        regularFileIdToFullPath ++= 
metadata.getFileIdAndFullPaths(basePath).toMap
+      }
+    }
+
+    if (metaBootstrapFileIdToFullPath.nonEmpty) {
+      // filer out meta bootstrap files that have had more commits since 
metadata bootstrap
+      metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
+        .filterNot(fileIdFullPath => 
regularFileIdToFullPath.contains(fileIdFullPath._1))
     }
+
     val pathGlobPattern = optParams.getOrElse(
       DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
       DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)
-    val filteredFullPath = 
if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL))
 {
-      val globMatcher = new GlobPattern("*" + pathGlobPattern)
-      fileIdToFullPath.filter(p => globMatcher.matches(p._2))
-    } else {
-      fileIdToFullPath
+    val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
+      
if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL))
 {
+        val globMatcher = new GlobPattern("*" + pathGlobPattern)
+        (regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
+          metaBootstrapFileIdToFullPath.filter(p => 
globMatcher.matches(p._2)).values)
+      } else {
+        (regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
+      }
     }
     // unset the path filter, otherwise if end_instant_time is not the latest 
instant, path filter set for RO view
     // will filter out all the files incorrectly.
     
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
     val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
-    if (filteredFullPath.isEmpty) {
+    if (filteredRegularFullPaths.isEmpty && 
filteredMetaBootstrapFullPaths.isEmpty) {
       sqlContext.sparkContext.emptyRDD[Row]
     } else {
       log.info("Additional Filters to be applied to incremental source are :" 
+ filters)
-      filters.foldLeft(sqlContext.read.options(sOpts)
-        .schema(latestSchema)
-        .parquet(filteredFullPath.values.toList: _*)
-        .filter(String.format("%s >= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp))
-        .filter(String.format("%s <= '%s'",
-          HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
commitsToReturn.last.getTimestamp)))((e, f) => e.filter(f))
-        .toDF().rdd
+
+      var df: DataFrame = 
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], latestSchema)

Review comment:
       We do need re-assignment to `df`. First we get `bootstrapped files` data 
in `df` and then union to add regular parquet data to it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to