[ https://issues.apache.org/jira/browse/SPARK-2521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Reynold Xin resolved SPARK-2521. -------------------------------- Resolution: Fixed Fix Version/s: 1.1.0 > Broadcast RDD object once per TaskSet (instead of sending it for every task) > ---------------------------------------------------------------------------- > > Key: SPARK-2521 > URL: https://issues.apache.org/jira/browse/SPARK-2521 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Reporter: Reynold Xin > Assignee: Reynold Xin > Fix For: 1.1.0 > > > Currently (as of Spark 1.0.1), Spark sends RDD object (which contains > closures) using Akka along with the task itself to the executors. This is > inefficient because all tasks in the same stage use the same RDD object, but > we have to send RDD object multiple times to the executors. This is > especially bad when a closure references some variable that is very large. > The current design led to users having to explicitly broadcast large > variables. > The patch uses broadcast to send RDD objects and the closures to executors, > and use Akka to only send a reference to the broadcast RDD/closure along with > the partition specific information for the task. For those of you who know > more about the internals, Spark already relies on broadcast to send the > Hadoop JobConf every time it uses the Hadoop input, because the JobConf is > large. > The user-facing impact of the change include: > Users won't need to decide what to broadcast anymore, unless they would want > to use a large object multiple times in different operations > Task size will get smaller, resulting in faster scheduling and higher task > dispatch throughput. > In addition, the change will simplify some internals of Spark, eliminating > the need to maintain task caches and the complex logic to broadcast JobConf > (which also led to a deadlock recently). > A simple way to test this: > {code} > val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a); > sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count > Numbers on 3 r3.8xlarge instances on EC2 > master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s > with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)