umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r447563017
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##########
@@ -92,36 +102,69 @@ class IncrementalRelation(val sqlContext: SQLContext,
override def schema: StructType = latestSchema
override def buildScan(): RDD[Row] = {
- val fileIdToFullPath = mutable.HashMap[String, String]()
+ val regularFileIdToFullPath = mutable.HashMap[String, String]()
+ var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
+
for (commit <- commitsToReturn) {
val metadata: HoodieCommitMetadata =
HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
.get, classOf[HoodieCommitMetadata])
- fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
+
+ if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp)
{
+ metaBootstrapFileIdToFullPath ++=
metadata.getFileIdAndFullPaths(basePath).toMap
+ } else {
+ regularFileIdToFullPath ++=
metadata.getFileIdAndFullPaths(basePath).toMap
+ }
+ }
+
+ if (metaBootstrapFileIdToFullPath.nonEmpty) {
+ // filer out meta bootstrap files that have had more commits since
metadata bootstrap
+ metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
+ .filterNot(fileIdFullPath =>
regularFileIdToFullPath.contains(fileIdFullPath._1))
}
+
val pathGlobPattern = optParams.getOrElse(
DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)
- val filteredFullPath =
if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL))
{
- val globMatcher = new GlobPattern("*" + pathGlobPattern)
- fileIdToFullPath.filter(p => globMatcher.matches(p._2))
- } else {
- fileIdToFullPath
+ val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
+
if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL))
{
+ val globMatcher = new GlobPattern("*" + pathGlobPattern)
+ (regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
+ metaBootstrapFileIdToFullPath.filter(p =>
globMatcher.matches(p._2)).values)
+ } else {
+ (regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
+ }
}
// unset the path filter, otherwise if end_instant_time is not the latest
instant, path filter set for RO view
// will filter out all the files incorrectly.
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
- if (filteredFullPath.isEmpty) {
+ if (filteredRegularFullPaths.isEmpty &&
filteredMetaBootstrapFullPaths.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
log.info("Additional Filters to be applied to incremental source are :"
+ filters)
- filters.foldLeft(sqlContext.read.options(sOpts)
- .schema(latestSchema)
- .parquet(filteredFullPath.values.toList: _*)
- .filter(String.format("%s >= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp))
- .filter(String.format("%s <= '%s'",
- HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)))((e, f) => e.filter(f))
- .toDF().rdd
+
+ var df: DataFrame =
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], latestSchema)
Review comment:
We do need re-assignment to `df`. First we get `bootstrapped files` data
in `df` and then union to add regular parquet data to it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]