This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d9b1bf9429d [MINOR] Parallelized the check for existence of files in 
IncrementalRelation. (#10480)
d9b1bf9429d is described below

commit d9b1bf9429d88d3d2989b0a5fc4efb39e0af7b6c
Author: Prashant Wason <[email protected]>
AuthorDate: Thu Jan 11 17:06:50 2024 -0800

    [MINOR] Parallelized the check for existence of files in 
IncrementalRelation. (#10480)
    
    This speedups the check for large datasets when a very large number of 
files need to be checked for existence.
---
 .../src/main/scala/org/apache/hudi/IncrementalRelation.scala  | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 227b585c9ef..6566c450477 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -24,6 +24,7 @@ import 
org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead
 import org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.config.SerializableConfiguration
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieFileFormat, 
HoodieRecord, HoodieReplaceCommitMetadata}
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME
@@ -239,11 +240,17 @@ class IncrementalRelation(val sqlContext: SQLContext,
           var doFullTableScan = false
 
           if (fallbackToFullTableScan) {
-            val fs = 
basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
+            // val fs = 
basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
             val timer = HoodieTimer.start
 
             val allFilesToCheck = filteredMetaBootstrapFullPaths ++ 
filteredRegularFullPaths
-            val firstNotFoundPath = allFilesToCheck.find(path => 
!fs.exists(new Path(path)))
+            val serializedConf = new 
SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
+            val localBasePathStr = basePath.toString
+            val firstNotFoundPath = 
sqlContext.sparkContext.parallelize(allFilesToCheck.toSeq, allFilesToCheck.size)
+              .map(path => {
+                val fs = new 
Path(localBasePathStr).getFileSystem(serializedConf.get)
+                fs.exists(new Path(path))
+              }).collect().find(v => !v)
             val timeTaken = timer.endTimer()
             log.info("Checking if paths exists took " + timeTaken + "ms")
 

Reply via email to