Fixes for the new BlockId naming convention.

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a8d09818
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a8d09818
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a8d09818

Branch: refs/heads/master
Commit: a8d0981832ba71415a35c16cdc2bedb98bbfcdb9
Parents: feb45d3
Author: Mosharaf Chowdhury <mosha...@cs.berkeley.edu>
Authored: Wed Oct 16 13:37:58 2013 -0700
Committer: Mosharaf Chowdhury <mosha...@cs.berkeley.edu>
Committed: Wed Oct 16 21:33:33 2013 -0700

----------------------------------------------------------------------
 .../org/apache/spark/broadcast/TorrentBroadcast.scala   | 12 ++++++------
 .../main/scala/org/apache/spark/storage/BlockId.scala   |  9 ++++++++-
 2 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a8d09818/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index c174804..3341401 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -23,7 +23,7 @@ import scala.math
 import scala.util.Random
 
 import org.apache.spark._
-import org.apache.spark.storage.{BlockManager, StorageLevel}
+import org.apache.spark.storage.{BroadcastBlockId, BroadcastHelperBlockId, 
StorageLevel}
 import org.apache.spark.util.Utils
 
 
@@ -32,7 +32,7 @@ extends Broadcast[T](id) with Logging with Serializable {
 
   def value = value_
 
-  def broadcastId = BlockManager.toBroadcastId(id)
+  def broadcastId = BroadcastBlockId(id)
 
   TorrentBroadcast.synchronized {
     SparkEnv.get.blockManager.putSingle(broadcastId, value_, 
StorageLevel.MEMORY_AND_DISK, false)
@@ -55,7 +55,7 @@ extends Broadcast[T](id) with Logging with Serializable {
     hasBlocks = tInfo.totalBlocks
 
     // Store meta-info
-    val metaId = broadcastId + "_meta"
+    val metaId = BroadcastHelperBlockId(broadcastId, "meta")
     val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
     TorrentBroadcast.synchronized {
       SparkEnv.get.blockManager.putSingle(
@@ -64,7 +64,7 @@ extends Broadcast[T](id) with Logging with Serializable {
 
     // Store individual pieces
     for (i <- 0 until totalBlocks) {
-      val pieceId = broadcastId + "_piece_" + i
+      val pieceId = BroadcastHelperBlockId(broadcastId, "piece" + i)
       TorrentBroadcast.synchronized {
         SparkEnv.get.blockManager.putSingle(
           pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, true)
@@ -117,7 +117,7 @@ extends Broadcast[T](id) with Logging with Serializable {
 
   def receiveBroadcast(variableID: Long): Boolean = {
     // Receive meta-info
-    val metaId = broadcastId + "_meta"
+    val metaId = BroadcastHelperBlockId(broadcastId, "meta")
     var attemptId = 10
     while (attemptId > 0 && totalBlocks == -1) {
       TorrentBroadcast.synchronized {
@@ -141,7 +141,7 @@ extends Broadcast[T](id) with Logging with Serializable {
     // Receive actual blocks
     val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 
1).toList)
     for (pid <- recvOrder) {
-      val pieceId = broadcastId + "_piece_" + pid
+      val pieceId = BroadcastHelperBlockId(broadcastId, "piece" + pid)
       TorrentBroadcast.synchronized {
         SparkEnv.get.blockManager.getSingle(pieceId) match {
           case Some(x) => 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a8d09818/core/src/main/scala/org/apache/spark/storage/BlockId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index c7efc67..7156d85 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -32,7 +32,7 @@ private[spark] sealed abstract class BlockId {
   def asRDDId = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
   def isRDD = isInstanceOf[RDDBlockId]
   def isShuffle = isInstanceOf[ShuffleBlockId]
-  def isBroadcast = isInstanceOf[BroadcastBlockId]
+  def isBroadcast = isInstanceOf[BroadcastBlockId] || 
isInstanceOf[BroadcastHelperBlockId]
 
   override def toString = name
   override def hashCode = name.hashCode
@@ -55,6 +55,10 @@ private[spark] case class BroadcastBlockId(broadcastId: 
Long) extends BlockId {
   def name = "broadcast_" + broadcastId
 }
 
+private[spark] case class BroadcastHelperBlockId(broadcastId: 
BroadcastBlockId, hType: String) extends BlockId {
+  def name = broadcastId.name + "_" + hType
+}
+
 private[spark] case class TaskResultBlockId(taskId: Long) extends BlockId {
   def name = "taskresult_" + taskId
 }
@@ -72,6 +76,7 @@ private[spark] object BlockId {
   val RDD = "rdd_([0-9]+)_([0-9]+)".r
   val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
   val BROADCAST = "broadcast_([0-9]+)".r
+  val BROADCAST_HELPER = "broadcast_([0-9]+)_([A-Za-z0-9]+)".r
   val TASKRESULT = "taskresult_([0-9]+)".r
   val STREAM = "input-([0-9]+)-([0-9]+)".r
   val TEST = "test_(.*)".r
@@ -84,6 +89,8 @@ private[spark] object BlockId {
       ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
     case BROADCAST(broadcastId) =>
       BroadcastBlockId(broadcastId.toLong)
+    case BROADCAST_HELPER(broadcastId, hType) =>
+      BroadcastHelperBlockId(BroadcastBlockId(broadcastId.toLong), hType)
     case TASKRESULT(taskId) =>
       TaskResultBlockId(taskId.toLong)
     case STREAM(streamId, uniqueId) =>

Reply via email to