Repository: spark
Updated Branches:
  refs/heads/master bd39ffe35 -> dae4d5db2


[SPARK-15247][SQL] Set the default number of partitions for reading parquet 
schemas

## What changes were proposed in this pull request?
This pr sets the default number of partitions when reading parquet schemas.
SQLContext#read#parquet currently yields at least n_executors * n_cores tasks 
even if parquet data consist of a  single small file. This issue could increase 
the latency for small jobs.

## How was this patch tested?
Manually tested and checked.

Author: Takeshi YAMAMURO <[email protected]>

Closes #13137 from maropu/SPARK-15247.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dae4d5db
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dae4d5db
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dae4d5db

Branch: refs/heads/master
Commit: dae4d5db21368faaa46fa8d1a256c27428694c2c
Parents: bd39ffe
Author: Takeshi YAMAMURO <[email protected]>
Authored: Tue Jun 14 13:05:56 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Tue Jun 14 13:05:56 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dae4d5db/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 71c1600..6b25e36 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -794,11 +794,16 @@ private[sql] object ParquetFileFormat extends Logging {
     // side, and resemble fake `FileStatus`es there.
     val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, 
f.getLen))
 
+    // Set the number of partitions to prevent following schema reads from 
generating many tasks
+    // in case of a small number of parquet files.
+    val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
+      sparkSession.sparkContext.defaultParallelism)
+
     // Issues a Spark job to read Parquet schema in parallel.
     val partiallyMergedSchemas =
       sparkSession
         .sparkContext
-        .parallelize(partialFileStatusInfo)
+        .parallelize(partialFileStatusInfo, numParallelism)
         .mapPartitions { iterator =>
           // Resembles fake `FileStatus`es with serialized path and length 
information.
           val fakeFileStatuses = iterator.map { case (path, length) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to