Repository: spark
Updated Branches:
  refs/heads/branch-2.1 63857c8d3 -> 517f39833


[SPARK-18379][SQL] Make the parallelism of parallelPartitionDiscovery 
configurable.

## What changes were proposed in this pull request?

The largest parallelism in PartitioningAwareFileIndex 
#listLeafFilesInParallel() is 10000 in hard code. We may need to make this 
number configurable. And in PR, I reduce it to 100.

## How was this patch tested?

Existing ut.

Author: genmao.ygm <[email protected]>
Author: dylon <[email protected]>

Closes #15829 from uncleGen/SPARK-18379.

(cherry picked from commit 745ab8bc50da89c42b297de9dcb833e5f2074481)
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/517f3983
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/517f3983
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/517f3983

Branch: refs/heads/branch-2.1
Commit: 517f39833cf789b536defe5ba4b010828d24831f
Parents: 63857c8
Author: genmao.ygm <[email protected]>
Authored: Tue Nov 15 10:32:43 2016 -0800
Committer: Sean Owen <[email protected]>
Committed: Mon Jan 2 15:26:09 2017 +0000

----------------------------------------------------------------------
 .../datasources/PartitioningAwareFileIndex.scala         |  4 +++-
 .../scala/org/apache/spark/sql/internal/SQLConf.scala    | 11 +++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/517f3983/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index f22b55b..825a0f7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -309,10 +309,12 @@ object PartitioningAwareFileIndex extends Logging {
     val sparkContext = sparkSession.sparkContext
     val serializableConfiguration = new SerializableConfiguration(hadoopConf)
     val serializedPaths = paths.map(_.toString)
+    val parallelPartitionDiscoveryParallelism =
+      sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
 
     // Set the number of parallelism to prevent following file listing from 
generating many tasks
     // in case of large #defaultParallelism.
-    val numParallelism = Math.min(paths.size, 10000)
+    val numParallelism = Math.min(paths.size, 
parallelPartitionDiscoveryParallelism)
 
     val statusMap = sparkContext
       .parallelize(serializedPaths, numParallelism)

http://git-wip-us.apache.org/repos/asf/spark/blob/517f3983/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5454be4..8fbad60 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -391,6 +391,14 @@ object SQLConf {
       .intConf
       .createWithDefault(32)
 
+  val PARALLEL_PARTITION_DISCOVERY_PARALLELISM =
+    
SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.parallelism")
+      .doc("The number of parallelism to list a collection of path 
recursively, Set the " +
+        "number to prevent file listing from generating too many tasks.")
+      .internal()
+      .intConf
+      .createWithDefault(10000)
+
   // Whether to automatically resolve ambiguity in join conditions for 
self-joins.
   // See SPARK-6231.
   val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
@@ -769,6 +777,9 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
   def parallelPartitionDiscoveryThreshold: Int =
     getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
 
+  def parallelPartitionDiscoveryParallelism: Int =
+    getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM)
+
   def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
 
   def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to