Repository: spark
Updated Branches:
  refs/heads/branch-2.0 24539223b -> 9adba414c


[SPARK-15247][SQL] Set the default number of partitions for reading parquet 
schemas

## What changes were proposed in this pull request?
This pr sets the default number of partitions when reading parquet schemas.
SQLContext#read#parquet currently yields at least n_executors * n_cores tasks 
even if parquet data consist of a  single small file. This issue could increase 
the latency for small jobs.

## How was this patch tested?
Manually tested and checked.

Author: Takeshi YAMAMURO <[email protected]>

Closes #13137 from maropu/SPARK-15247.

(cherry picked from commit dae4d5db21368faaa46fa8d1a256c27428694c2c)
Signed-off-by: Yin Huai <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9adba414
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9adba414
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9adba414

Branch: refs/heads/branch-2.0
Commit: 9adba414cb69b1f2dd38f75c81ec3ab0549a353a
Parents: 2453922
Author: Takeshi YAMAMURO <[email protected]>
Authored: Tue Jun 14 13:05:56 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Tue Jun 14 13:06:11 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9adba414/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 71c1600..6b25e36 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -794,11 +794,16 @@ private[sql] object ParquetFileFormat extends Logging {
     // side, and resemble fake `FileStatus`es there.
     val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, 
f.getLen))
 
+    // Set the number of partitions to prevent following schema reads from 
generating many tasks
+    // in case of a small number of parquet files.
+    val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
+      sparkSession.sparkContext.defaultParallelism)
+
     // Issues a Spark job to read Parquet schema in parallel.
     val partiallyMergedSchemas =
       sparkSession
         .sparkContext
-        .parallelize(partialFileStatusInfo)
+        .parallelize(partialFileStatusInfo, numParallelism)
         .mapPartitions { iterator =>
           // Resembles fake `FileStatus`es with serialized path and length 
information.
           val fakeFileStatuses = iterator.map { case (path, length) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to