Repository: spark Updated Branches: refs/heads/branch-2.1 63857c8d3 -> 517f39833
[SPARK-18379][SQL] Make the parallelism of parallelPartitionDiscovery configurable. ## What changes were proposed in this pull request? The largest parallelism in PartitioningAwareFileIndex #listLeafFilesInParallel() is 10000 in hard code. We may need to make this number configurable. And in PR, I reduce it to 100. ## How was this patch tested? Existing ut. Author: genmao.ygm <[email protected]> Author: dylon <[email protected]> Closes #15829 from uncleGen/SPARK-18379. (cherry picked from commit 745ab8bc50da89c42b297de9dcb833e5f2074481) Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/517f3983 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/517f3983 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/517f3983 Branch: refs/heads/branch-2.1 Commit: 517f39833cf789b536defe5ba4b010828d24831f Parents: 63857c8 Author: genmao.ygm <[email protected]> Authored: Tue Nov 15 10:32:43 2016 -0800 Committer: Sean Owen <[email protected]> Committed: Mon Jan 2 15:26:09 2017 +0000 ---------------------------------------------------------------------- .../datasources/PartitioningAwareFileIndex.scala | 4 +++- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/517f3983/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index f22b55b..825a0f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -309,10 +309,12 @@ object PartitioningAwareFileIndex extends Logging { val sparkContext = sparkSession.sparkContext val serializableConfiguration = new SerializableConfiguration(hadoopConf) val serializedPaths = paths.map(_.toString) + val parallelPartitionDiscoveryParallelism = + sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism // Set the number of parallelism to prevent following file listing from generating many tasks // in case of large #defaultParallelism. - val numParallelism = Math.min(paths.size, 10000) + val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism) val statusMap = sparkContext .parallelize(serializedPaths, numParallelism) http://git-wip-us.apache.org/repos/asf/spark/blob/517f3983/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5454be4..8fbad60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -391,6 +391,14 @@ object SQLConf { .intConf .createWithDefault(32) + val PARALLEL_PARTITION_DISCOVERY_PARALLELISM = + SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.parallelism") + .doc("The number of parallelism to list a collection of path recursively, Set the " + + "number to prevent file listing from generating too many tasks.") + .internal() + .intConf + .createWithDefault(10000) + // Whether to automatically resolve ambiguity in join conditions for self-joins. // See SPARK-6231. val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = @@ -769,6 +777,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def parallelPartitionDiscoveryThreshold: Int = getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) + def parallelPartitionDiscoveryParallelism: Int = + getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM) + def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED) def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
