This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d9b1bf9429d [MINOR] Parallelized the check for existence of files in
IncrementalRelation. (#10480)
d9b1bf9429d is described below
commit d9b1bf9429d88d3d2989b0a5fc4efb39e0af7b6c
Author: Prashant Wason <[email protected]>
AuthorDate: Thu Jan 11 17:06:50 2024 -0800
[MINOR] Parallelized the check for existence of files in
IncrementalRelation. (#10480)
This speedups the check for large datasets when a very large number of
files need to be checked for existence.
---
.../src/main/scala/org/apache/hudi/IncrementalRelation.scala | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 227b585c9ef..6566c450477 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -24,6 +24,7 @@ import
org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead
import org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.config.SerializableConfiguration
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieFileFormat,
HoodieRecord, HoodieReplaceCommitMetadata}
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME
@@ -239,11 +240,17 @@ class IncrementalRelation(val sqlContext: SQLContext,
var doFullTableScan = false
if (fallbackToFullTableScan) {
- val fs =
basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
+ // val fs =
basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
val timer = HoodieTimer.start
val allFilesToCheck = filteredMetaBootstrapFullPaths ++
filteredRegularFullPaths
- val firstNotFoundPath = allFilesToCheck.find(path =>
!fs.exists(new Path(path)))
+ val serializedConf = new
SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
+ val localBasePathStr = basePath.toString
+ val firstNotFoundPath =
sqlContext.sparkContext.parallelize(allFilesToCheck.toSeq, allFilesToCheck.size)
+ .map(path => {
+ val fs = new
Path(localBasePathStr).getFileSystem(serializedConf.get)
+ fs.exists(new Path(path))
+ }).collect().find(v => !v)
val timeTaken = timer.endTimer()
log.info("Checking if paths exists took " + timeTaken + "ms")