Removed unnecessary code, and added comment of memory-latency tradeoff.

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6e5a60fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6e5a60fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6e5a60fa

Branch: refs/heads/master
Commit: 6e5a60fab46daf5525749d9e71f1ccfaffddd34d
Parents: 4602e2b
Author: Mosharaf Chowdhury <mosha...@cs.berkeley.edu>
Authored: Mon Oct 14 09:40:51 2013 -0700
Committer: Mosharaf Chowdhury <mosha...@cs.berkeley.edu>
Committed: Wed Oct 16 21:33:33 2013 -0700

----------------------------------------------------------------------
 .../org/apache/spark/broadcast/TorrentBroadcast.scala     | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6e5a60fa/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index ad1d29a..29e0dd2 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -89,7 +89,12 @@ extends Broadcast[T](id) with Logging with Serializable {
 
           if (receiveBroadcast(id)) {
             value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, 
totalBytes, totalBlocks)
-            SparkEnv.get.blockManager.putSingle(broadcastId, value_, 
StorageLevel.MEMORY_AND_DISK, false)
+            
+            // Store the merged copy in cache so that the next worker doesn't 
need to rebuild it.
+            // This creates a tradeoff between memory usage and latency.
+            // Storing copy doubles the memory footprint; not storing doubles 
deserialization cost.
+            SparkEnv.get.blockManager.putSingle(
+              broadcastId, value_, StorageLevel.MEMORY_AND_DISK, false)
 
             // Remove arrayOfBlocks from memory once value_ is on local cache
             resetWorkerVariables()
@@ -111,9 +116,6 @@ extends Broadcast[T](id) with Logging with Serializable {
   }
 
   def receiveBroadcast(variableID: Long): Boolean = {
-    if (totalBlocks > 0 && totalBlocks == hasBlocks)
-      return true
-
     // Receive meta-info
     val metaId = broadcastId + "_meta"
     var attemptId = 10

Reply via email to