Repository: spark Updated Branches: refs/heads/branch-2.0 24539223b -> 9adba414c
[SPARK-15247][SQL] Set the default number of partitions for reading parquet schemas ## What changes were proposed in this pull request? This pr sets the default number of partitions when reading parquet schemas. SQLContext#read#parquet currently yields at least n_executors * n_cores tasks even if parquet data consist of a single small file. This issue could increase the latency for small jobs. ## How was this patch tested? Manually tested and checked. Author: Takeshi YAMAMURO <[email protected]> Closes #13137 from maropu/SPARK-15247. (cherry picked from commit dae4d5db21368faaa46fa8d1a256c27428694c2c) Signed-off-by: Yin Huai <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9adba414 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9adba414 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9adba414 Branch: refs/heads/branch-2.0 Commit: 9adba414cb69b1f2dd38f75c81ec3ab0549a353a Parents: 2453922 Author: Takeshi YAMAMURO <[email protected]> Authored: Tue Jun 14 13:05:56 2016 -0700 Committer: Yin Huai <[email protected]> Committed: Tue Jun 14 13:06:11 2016 -0700 ---------------------------------------------------------------------- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9adba414/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 71c1600..6b25e36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -794,11 +794,16 @@ private[sql] object ParquetFileFormat extends Logging { // side, and resemble fake `FileStatus`es there. val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen)) + // Set the number of partitions to prevent following schema reads from generating many tasks + // in case of a small number of parquet files. + val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1), + sparkSession.sparkContext.defaultParallelism) + // Issues a Spark job to read Parquet schema in parallel. val partiallyMergedSchemas = sparkSession .sparkContext - .parallelize(partialFileStatusInfo) + .parallelize(partialFileStatusInfo, numParallelism) .mapPartitions { iterator => // Resembles fake `FileStatus`es with serialized path and length information. val fakeFileStatuses = iterator.map { case (path, length) => --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
