[ 
https://issues.apache.org/jira/browse/SPARK-2521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14067824#comment-14067824
 ] 

Apache Spark commented on SPARK-2521:
-------------------------------------

User 'rxin' has created a pull request for this issue:
https://github.com/apache/spark/pull/1498

> Broadcast RDD object once per TaskSet (instead of sending it for every task)
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-2521
>                 URL: https://issues.apache.org/jira/browse/SPARK-2521
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>            Reporter: Reynold Xin
>            Assignee: Reynold Xin
>
> Currently (as of Spark 1.0.1), Spark sends RDD object (which contains 
> closures) using Akka along with the task itself to the executors. This is 
> inefficient because all tasks in the same stage use the same RDD object, but 
> we have to send RDD object multiple times to the executors. This is 
> especially bad when a closure references some variable that is very large. 
> The current design led to users having to explicitly broadcast large 
> variables.
> The patch uses broadcast to send RDD objects and the closures to executors, 
> and use Akka to only send a reference to the broadcast RDD/closure along with 
> the partition specific information for the task. For those of you who know 
> more about the internals, Spark already relies on broadcast to send the 
> Hadoop JobConf every time it uses the Hadoop input, because the JobConf is 
> large.
> The user-facing impact of the change include:
> Users won't need to decide what to broadcast anymore, unless they would want 
> to use a large object multiple times in different operations
> Task size will get smaller, resulting in faster scheduling and higher task 
> dispatch throughput.
> In addition, the change will simplify some internals of Spark, eliminating 
> the need to maintain task caches and the complex logic to broadcast JobConf 
> (which also led to a deadlock recently).
> A simple way to test this:
> {code}
> val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
> sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count
> Numbers on 3 r3.8xlarge instances on EC2
> master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
> with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to