Removed unnecessary code, and added comment of memory-latency tradeoff.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6e5a60fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6e5a60fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6e5a60fa Branch: refs/heads/master Commit: 6e5a60fab46daf5525749d9e71f1ccfaffddd34d Parents: 4602e2b Author: Mosharaf Chowdhury <mosha...@cs.berkeley.edu> Authored: Mon Oct 14 09:40:51 2013 -0700 Committer: Mosharaf Chowdhury <mosha...@cs.berkeley.edu> Committed: Wed Oct 16 21:33:33 2013 -0700 ---------------------------------------------------------------------- .../org/apache/spark/broadcast/TorrentBroadcast.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6e5a60fa/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index ad1d29a..29e0dd2 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -89,7 +89,12 @@ extends Broadcast[T](id) with Logging with Serializable { if (receiveBroadcast(id)) { value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) - SparkEnv.get.blockManager.putSingle(broadcastId, value_, StorageLevel.MEMORY_AND_DISK, false) + + // Store the merged copy in cache so that the next worker doesn't need to rebuild it. + // This creates a tradeoff between memory usage and latency. + // Storing copy doubles the memory footprint; not storing doubles deserialization cost. + SparkEnv.get.blockManager.putSingle( + broadcastId, value_, StorageLevel.MEMORY_AND_DISK, false) // Remove arrayOfBlocks from memory once value_ is on local cache resetWorkerVariables() @@ -111,9 +116,6 @@ extends Broadcast[T](id) with Logging with Serializable { } def receiveBroadcast(variableID: Long): Boolean = { - if (totalBlocks > 0 && totalBlocks == hasBlocks) - return true - // Receive meta-info val metaId = broadcastId + "_meta" var attemptId = 10