Repository: spark Updated Branches: refs/heads/master bd39ffe35 -> dae4d5db2
[SPARK-15247][SQL] Set the default number of partitions for reading parquet schemas ## What changes were proposed in this pull request? This pr sets the default number of partitions when reading parquet schemas. SQLContext#read#parquet currently yields at least n_executors * n_cores tasks even if parquet data consist of a single small file. This issue could increase the latency for small jobs. ## How was this patch tested? Manually tested and checked. Author: Takeshi YAMAMURO <[email protected]> Closes #13137 from maropu/SPARK-15247. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dae4d5db Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dae4d5db Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dae4d5db Branch: refs/heads/master Commit: dae4d5db21368faaa46fa8d1a256c27428694c2c Parents: bd39ffe Author: Takeshi YAMAMURO <[email protected]> Authored: Tue Jun 14 13:05:56 2016 -0700 Committer: Yin Huai <[email protected]> Committed: Tue Jun 14 13:05:56 2016 -0700 ---------------------------------------------------------------------- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dae4d5db/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 71c1600..6b25e36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -794,11 +794,16 @@ private[sql] object ParquetFileFormat extends Logging { // side, and resemble fake `FileStatus`es there. val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen)) + // Set the number of partitions to prevent following schema reads from generating many tasks + // in case of a small number of parquet files. + val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1), + sparkSession.sparkContext.defaultParallelism) + // Issues a Spark job to read Parquet schema in parallel. val partiallyMergedSchemas = sparkSession .sparkContext - .parallelize(partialFileStatusInfo) + .parallelize(partialFileStatusInfo, numParallelism) .mapPartitions { iterator => // Resembles fake `FileStatus`es with serialized path and length information. val fakeFileStatuses = iterator.map { case (path, length) => --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
