Repository: spark Updated Branches: refs/heads/branch-2.0 8c4050a5a -> d9db8a9c8
[SPARK-15530][SQL] Set #parallelism for file listing in listLeafFilesInParallel ## What changes were proposed in this pull request? This pr is to set the number of parallelism to prevent file listing in `listLeafFilesInParallel` from generating many tasks in case of large #defaultParallelism. ## How was this patch tested? Manually checked Author: Takeshi YAMAMURO <[email protected]> Closes #13444 from maropu/SPARK-15530. (cherry picked from commit 5ad4e32d46599ae1b8626f08aa97345d078c28d7) Signed-off-by: Yin Huai <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9db8a9c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9db8a9c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9db8a9c Branch: refs/heads/branch-2.0 Commit: d9db8a9c85163c25ae3ec84cb9ac88b5615db82e Parents: 8c4050a Author: Takeshi YAMAMURO <[email protected]> Authored: Mon Jun 13 13:41:26 2016 -0700 Committer: Yin Huai <[email protected]> Committed: Mon Jun 13 13:41:46 2016 -0700 ---------------------------------------------------------------------- .../sql/execution/datasources/ListingFileCatalog.scala | 2 +- .../execution/datasources/fileSourceInterfaces.scala | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d9db8a9c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index dd3c96a..7d2854a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -75,7 +75,7 @@ class ListingFileCatalog( protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext) + HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession) } else { // Dummy jobconf to get to the pathFilter defined in configuration val jobConf = new JobConf(hadoopConf, this.getClass) http://git-wip-us.apache.org/repos/asf/spark/blob/d9db8a9c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 890e64d..9c18989 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -448,13 +448,22 @@ private[sql] object HadoopFsRelation extends Logging { def listLeafFilesInParallel( paths: Seq[Path], hadoopConf: Configuration, - sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = { + sparkSession: SparkSession): mutable.LinkedHashSet[FileStatus] = { + assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") + val sparkContext = sparkSession.sparkContext + val sqlConf = sparkSession.sessionState.conf val serializableConfiguration = new SerializableConfiguration(hadoopConf) val serializedPaths = paths.map(_.toString) - val fakeStatuses = sparkContext.parallelize(serializedPaths).mapPartitions { paths => + // Set the number of parallelism to prevent following file listing from generating many tasks + // in case of large #defaultParallelism. + val numParallelism = Math.min(paths.size, 10000) + + val fakeStatuses = sparkContext + .parallelize(serializedPaths, numParallelism) + .mapPartitions { paths => // Dummy jobconf to get to the pathFilter defined in configuration // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow) val jobConf = new JobConf(serializableConfiguration.value, this.getClass) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
