Repository: spark
Updated Branches:
  refs/heads/branch-1.6 017b73e69 -> 6ef823544


[SPARK-12598][CORE] bug in setMinPartitions

There is a bug in the calculation of ```maxSplitSize```.  The ```totalLen``` 
should be divided by ```minPartitions``` and not by ```files.size```.

Author: Darek Blasiak <[email protected]>

Closes #10546 from datafarmer/setminpartitionsbug.

(cherry picked from commit 8346518357f4a3565ae41e9a5ccd7e2c3ed6c468)
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ef82354
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ef82354
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ef82354

Branch: refs/heads/branch-1.6
Commit: 6ef823544dfbc8c9843bdedccfda06147a1a74fe
Parents: 017b73e
Author: Darek Blasiak <[email protected]>
Authored: Thu Jan 7 21:15:40 2016 +0000
Committer: Sean Owen <[email protected]>
Committed: Thu Jan 7 21:17:48 2016 +0000

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/input/PortableDataStream.scala  | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6ef82354/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala 
b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 280e7a5..8e97584 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -43,9 +43,8 @@ private[spark] abstract class StreamFileInputFormat[T]
    * which is set through setMaxSplitSize
    */
   def setMinPartitions(context: JobContext, minPartitions: Int) {
-    val files = listStatus(context).asScala
-    val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum
-    val maxSplitSize = Math.ceil(totalLen * 1.0 / files.size).toLong
+    val totalLen = 
listStatus(context).asScala.filterNot(_.isDir).map(_.getLen).sum
+    val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 
1.0)).toLong
     super.setMaxSplitSize(maxSplitSize)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to