boneanxs commented on code in PR #6141:
URL: https://github.com/apache/hudi/pull/6141#discussion_r932028988
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala:
##########
@@ -188,71 +196,90 @@ class IncrementalRelation(val sqlContext: SQLContext,
case HoodieFileFormat.ORC => "orc"
}
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
+
+ // Fallback to full table scan if any of the following conditions
matches:
+ // 1. the start commit is archived
+ // 2. the end commit is archived
+ // 3. there are files in metadata be deleted
+ val fallbackToFullTableScan =
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
+
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
+
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
- if (filteredRegularFullPaths.isEmpty &&
filteredMetaBootstrapFullPaths.isEmpty) {
- sqlContext.sparkContext.emptyRDD[Row]
- } else {
- log.info("Additional Filters to be applied to incremental source are
:" + filters.mkString("Array(", ", ", ")"))
- var df: DataFrame =
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
+ val startInstantTime =
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
+ val startInstantArchived =
startInstantTime.compareTo(commitTimeline.firstInstant().get().getTimestamp) <
0 // True if startInstantTime < activeTimeline.first
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]