Repository: spark
Updated Branches:
  refs/heads/master 3b7fb84cf -> 5ad4e32d4


[SPARK-15530][SQL] Set #parallelism for file listing in listLeafFilesInParallel

## What changes were proposed in this pull request?
This pr is to set the number of parallelism to prevent file listing in 
`listLeafFilesInParallel` from generating many tasks in case of large 
#defaultParallelism.

## How was this patch tested?
Manually checked

Author: Takeshi YAMAMURO <[email protected]>

Closes #13444 from maropu/SPARK-15530.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ad4e32d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ad4e32d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ad4e32d

Branch: refs/heads/master
Commit: 5ad4e32d46599ae1b8626f08aa97345d078c28d7
Parents: 3b7fb84
Author: Takeshi YAMAMURO <[email protected]>
Authored: Mon Jun 13 13:41:26 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Mon Jun 13 13:41:26 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/ListingFileCatalog.scala |  2 +-
 .../execution/datasources/fileSourceInterfaces.scala   | 13 +++++++++++--
 2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5ad4e32d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index dd3c96a..7d2854a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -75,7 +75,7 @@ class ListingFileCatalog(
 
   protected def listLeafFiles(paths: Seq[Path]): 
mutable.LinkedHashSet[FileStatus] = {
     if (paths.length >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
-      HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, 
sparkSession.sparkContext)
+      HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
     } else {
       // Dummy jobconf to get to the pathFilter defined in configuration
       val jobConf = new JobConf(hadoopConf, this.getClass)

http://git-wip-us.apache.org/repos/asf/spark/blob/5ad4e32d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 890e64d..9c18989 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -448,13 +448,22 @@ private[sql] object HadoopFsRelation extends Logging {
   def listLeafFilesInParallel(
       paths: Seq[Path],
       hadoopConf: Configuration,
-      sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = {
+      sparkSession: SparkSession): mutable.LinkedHashSet[FileStatus] = {
+    assert(paths.size >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
     logInfo(s"Listing leaf files and directories in parallel under: 
${paths.mkString(", ")}")
 
+    val sparkContext = sparkSession.sparkContext
+    val sqlConf = sparkSession.sessionState.conf
     val serializableConfiguration = new SerializableConfiguration(hadoopConf)
     val serializedPaths = paths.map(_.toString)
 
-    val fakeStatuses = sparkContext.parallelize(serializedPaths).mapPartitions 
{ paths =>
+    // Set the number of parallelism to prevent following file listing from 
generating many tasks
+    // in case of large #defaultParallelism.
+    val numParallelism = Math.min(paths.size, 10000)
+
+    val fakeStatuses = sparkContext
+        .parallelize(serializedPaths, numParallelism)
+        .mapPartitions { paths =>
       // Dummy jobconf to get to the pathFilter defined in configuration
       // It's very expensive to create a JobConf(ClassUtil.findContainingJar() 
is slow)
       val jobConf = new JobConf(serializableConfiguration.value, this.getClass)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to