nsivabalan commented on a change in pull request #3946:
URL: https://github.com/apache/hudi/pull/3946#discussion_r763652433
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##########
@@ -155,27 +155,60 @@ class IncrementalRelation(val sqlContext: SQLContext,
if (filteredRegularFullPaths.isEmpty &&
filteredMetaBootstrapFullPaths.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
- log.info("Additional Filters to be applied to incremental source are :"
+ filters)
+ log.info("Additional Filters to be applied to incremental source are :"
+ filters.mkString("Array(", ", ", ")"))
var df: DataFrame =
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
- if (metaBootstrapFileIdToFullPath.nonEmpty) {
- df = sqlContext.sparkSession.read
- .format("hudi")
- .schema(usedSchema)
- .option(DataSourceReadOptions.READ_PATHS.key,
filteredMetaBootstrapFullPaths.mkString(","))
- .load()
+ val fullTableScanFallback =
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
+
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
+
+ var doFullTableScan = false
+
+ if (fullTableScanFallback) {
+ val fs = new
Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
+ val timer = new HoodieTimer().startTimer();
+
+ val allFilesToCheck = filteredMetaBootstrapFullPaths ++
filteredRegularFullPaths
+ val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new
Path(path)))
+ val timeTaken = timer.endTimer()
+ log.info("Checking if paths exists took " + timeTaken + "ms")
+
+ val optStartTs = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
+ val isInstantArchived =
optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 //
True if optStartTs < activeTimeline.first
Review comment:
From a user standpoint, I would expect we should fallback to first valid
commit in active timeline which cleaner has not cleaned up. But guess from an
impl standpoint, we can't find this commit that easily. And so is the rational
to fallback to snapshot query?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]