http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/616ea6f3/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala 
b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 2bb7715..1720007 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -23,20 +23,24 @@ import org.apache.spark.util.Utils
 
 private[spark]
 case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
-  blocks: Map[String, BlockStatus]) {
+  blocks: Map[BlockId, BlockStatus]) {
 
-  def memUsed(blockPrefix: String = "") = {
-    blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.memSize).
-      reduceOption(_+_).getOrElse(0l)
-  }
+  def memUsed() = blocks.values.map(_.memSize).reduceOption(_+_).getOrElse(0L)
 
-  def diskUsed(blockPrefix: String = "") = {
-    blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.diskSize).
-      reduceOption(_+_).getOrElse(0l)
-  }
+  def memUsedByRDD(rddId: Int) =
+    rddBlocks.filterKeys(_.rddId == 
rddId).values.map(_.memSize).reduceOption(_+_).getOrElse(0L)
+
+  def diskUsed() = 
blocks.values.map(_.diskSize).reduceOption(_+_).getOrElse(0L)
+
+  def diskUsedByRDD(rddId: Int) =
+    rddBlocks.filterKeys(_.rddId == 
rddId).values.map(_.diskSize).reduceOption(_+_).getOrElse(0L)
 
   def memRemaining : Long = maxMem - memUsed()
 
+  def rddBlocks = blocks.flatMap {
+    case (rdd: RDDBlockId, status) => Some(rdd, status)
+    case _ => None
+  }
 }
 
 case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
@@ -60,7 +64,7 @@ object StorageUtils {
   /* Returns RDD-level information, compiled from a list of StorageStatus 
objects */
   def rddInfoFromStorageStatus(storageStatusList: Seq[StorageStatus],
     sc: SparkContext) : Array[RDDInfo] = {
-    rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc)
+    
rddInfoFromBlockStatusList(storageStatusList.flatMap(_.rddBlocks).toMap[RDDBlockId,
 BlockStatus], sc)
   }
 
   /* Returns a map of blocks to their locations, compiled from a list of 
StorageStatus objects */
@@ -71,26 +75,21 @@ object StorageUtils {
   }
 
   /* Given a list of BlockStatus objets, returns information for each RDD */
-  def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus],
+  def rddInfoFromBlockStatusList(infos: Map[RDDBlockId, BlockStatus],
     sc: SparkContext) : Array[RDDInfo] = {
 
     // Group by rddId, ignore the partition name
-    val groupedRddBlocks = infos.filterKeys(_.startsWith("rdd_")).groupBy { 
case(k, v) =>
-      k.substring(0,k.lastIndexOf('_'))
-    }.mapValues(_.values.toArray)
+    val groupedRddBlocks = infos.groupBy { case(k, v) => k.rddId 
}.mapValues(_.values.toArray)
 
     // For each RDD, generate an RDDInfo object
-    val rddInfos = groupedRddBlocks.map { case (rddKey, rddBlocks) =>
+    val rddInfos = groupedRddBlocks.map { case (rddId, rddBlocks) =>
       // Add up memory and disk sizes
       val memSize = rddBlocks.map(_.memSize).reduce(_ + _)
       val diskSize = rddBlocks.map(_.diskSize).reduce(_ + _)
 
-      // Find the id of the RDD, e.g. rdd_1 => 1
-      val rddId = rddKey.split("_").last.toInt
-
       // Get the friendly name and storage level for the RDD, if available
       sc.persistentRdds.get(rddId).map { r =>
-        val rddName = Option(r.name).getOrElse(rddKey)
+        val rddName = Option(r.name).getOrElse(rddId.toString)
         val rddStorageLevel = r.getStorageLevel
         RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, 
r.partitions.size, memSize, diskSize)
       }
@@ -101,16 +100,14 @@ object StorageUtils {
     rddInfos
   }
 
-  /* Removes all BlockStatus object that are not part of a block prefix */
-  def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus],
-    prefix: String) : Array[StorageStatus] = {
+  /* Filters storage status by a given RDD id. */
+  def filterStorageStatusByRDD(storageStatusList: Array[StorageStatus], rddId: 
Int)
+    : Array[StorageStatus] = {
 
     storageStatusList.map { status =>
-      val newBlocks = status.blocks.filterKeys(_.startsWith(prefix))
+      val newBlocks = status.rddBlocks.filterKeys(_.rddId == 
rddId).toMap[BlockId, BlockStatus]
       //val newRemainingMem = status.maxMem - 
newBlocks.values.map(_.memSize).reduce(_ + _)
       StorageStatus(status.blockManagerId, status.maxMem, newBlocks)
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/616ea6f3/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala 
b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index f2ae8dd..860e680 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -36,11 +36,11 @@ private[spark] object ThreadingTest {
   val numBlocksPerProducer = 20000
 
   private[spark] class ProducerThread(manager: BlockManager, id: Int) extends 
Thread {
-    val queue = new ArrayBlockingQueue[(String, Seq[Int])](100)
+    val queue = new ArrayBlockingQueue[(BlockId, Seq[Int])](100)
 
     override def run() {
       for (i <- 1 to numBlocksPerProducer) {
-        val blockId = "b-" + id + "-" + i
+        val blockId = TestBlockId("b-" + id + "-" + i)
         val blockSize = Random.nextInt(1000)
         val block = (1 to blockSize).map(_ => Random.nextInt())
         val level = randomLevel()
@@ -64,7 +64,7 @@ private[spark] object ThreadingTest {
 
   private[spark] class ConsumerThread(
       manager: BlockManager,
-      queue: ArrayBlockingQueue[(String, Seq[Int])]
+      queue: ArrayBlockingQueue[(BlockId, Seq[Int])]
     ) extends Thread {
     var numBlockConsumed = 0
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/616ea6f3/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala 
b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 43c1257..b83cd54 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
 
-import org.apache.spark.storage.{StorageStatus, StorageUtils}
+import org.apache.spark.storage.{BlockId, StorageStatus, StorageUtils}
 import org.apache.spark.storage.BlockManagerMasterActor.BlockStatus
 import org.apache.spark.ui.UIUtils._
 import org.apache.spark.ui.Page._
@@ -33,21 +33,20 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
   val sc = parent.sc
 
   def render(request: HttpServletRequest): Seq[Node] = {
-    val id = request.getParameter("id")
-    val prefix = "rdd_" + id.toString
+    val id = request.getParameter("id").toInt
     val storageStatusList = sc.getExecutorStorageStatus
-    val filteredStorageStatusList = StorageUtils.
-      filterStorageStatusByPrefix(storageStatusList, prefix)
+    val filteredStorageStatusList = 
StorageUtils.filterStorageStatusByRDD(storageStatusList, id)
     val rddInfo = 
StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
 
     val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage")
-    val workers = filteredStorageStatusList.map((prefix, _))
+    val workers = filteredStorageStatusList.map((id, _))
     val workerTable = listingTable(workerHeaders, workerRow, workers)
 
     val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", 
"Size on Disk",
       "Executors")
 
-    val blockStatuses = 
filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1)
+    val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray.
+      sortWith(_._1.name < _._1.name)
     val blockLocations = 
StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList)
     val blocks = blockStatuses.map {
       case(id, status) => (id, status, 
blockLocations.get(id).getOrElse(Seq("UNKNOWN")))
@@ -99,7 +98,7 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
     headerSparkPage(content, parent.sc, "RDD Storage Info for " + 
rddInfo.name, Storage)
   }
 
-  def blockRow(row: (String, BlockStatus, Seq[String])): Seq[Node] = {
+  def blockRow(row: (BlockId, BlockStatus, Seq[String])): Seq[Node] = {
     val (id, block, locations) = row
     <tr>
       <td>{id}</td>
@@ -118,15 +117,15 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
     </tr>
   }
 
-  def workerRow(worker: (String, StorageStatus)): Seq[Node] = {
-    val (prefix, status) = worker
+  def workerRow(worker: (Int, StorageStatus)): Seq[Node] = {
+    val (rddId, status) = worker
     <tr>
       <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
       <td>
-        {Utils.bytesToString(status.memUsed(prefix))}
+        {Utils.bytesToString(status.memUsedByRDD(rddId))}
         ({Utils.bytesToString(status.memRemaining)} Remaining)
       </td>
-      <td>{Utils.bytesToString(status.diskUsed(prefix))}</td>
+      <td>{Utils.bytesToString(status.diskUsedByRDD(rddId))}</td>
     </tr>
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/616ea6f3/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 3a7171c..ced036c 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -23,7 +23,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
 import org.scalatest.mock.EasyMockSugar
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.{BlockManager, StorageLevel}
+import org.apache.spark.storage.{BlockManager, RDDBlockId, StorageLevel}
 
 // TODO: Test the CacheManager's thread-safety aspects
 class CacheManagerSuite extends FunSuite with BeforeAndAfter with 
EasyMockSugar {
@@ -52,9 +52,9 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter 
with EasyMockSugar
 
   test("get uncached rdd") {
     expecting {
-      blockManager.get("rdd_0_0").andReturn(None)
-      blockManager.put("rdd_0_0", ArrayBuffer[Any](1, 2, 3, 4), 
StorageLevel.MEMORY_ONLY, true).
-        andReturn(0)
+      blockManager.get(RDDBlockId(0, 0)).andReturn(None)
+      blockManager.put(RDDBlockId(0, 0), ArrayBuffer[Any](1, 2, 3, 4), 
StorageLevel.MEMORY_ONLY,
+        true).andReturn(0)
     }
 
     whenExecuting(blockManager) {
@@ -66,7 +66,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter 
with EasyMockSugar
 
   test("get cached rdd") {
     expecting {
-      blockManager.get("rdd_0_0").andReturn(Some(ArrayBuffer(5, 6, 
7).iterator))
+      blockManager.get(RDDBlockId(0, 0)).andReturn(Some(ArrayBuffer(5, 6, 
7).iterator))
     }
 
     whenExecuting(blockManager) {
@@ -79,7 +79,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter 
with EasyMockSugar
   test("get uncached local rdd") {
     expecting {
       // Local computation should not persist the resulting value, so don't 
expect a put().
-      blockManager.get("rdd_0_0").andReturn(None)
+      blockManager.get(RDDBlockId(0, 0)).andReturn(None)
     }
 
     whenExecuting(blockManager) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/616ea6f3/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala 
b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index d9103ae..7ca5f16 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -21,7 +21,7 @@ import org.scalatest.FunSuite
 import java.io.File
 import org.apache.spark.rdd._
 import org.apache.spark.SparkContext._
-import storage.StorageLevel
+import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
 import org.apache.spark.util.Utils
 
 class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
@@ -83,7 +83,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext 
with Logging {
   }
 
   test("BlockRDD") {
-    val blockId = "id"
+    val blockId = TestBlockId("id")
     val blockManager = SparkEnv.get.blockManager
     blockManager.putSingle(blockId, "test", StorageLevel.MEMORY_ONLY)
     val blockRDD = new BlockRDD[String](sc, Array(blockId))
@@ -191,7 +191,7 @@ class CheckpointSuite extends FunSuite with 
LocalSparkContext with Logging {
   }
 
   test("CheckpointRDD with zero partitions") {
-    val rdd = new BlockRDD[Int](sc, Array[String]())
+    val rdd = new BlockRDD[Int](sc, Array[BlockId]())
     assert(rdd.partitions.size === 0)
     assert(rdd.isCheckpointed === false)
     rdd.checkpoint()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/616ea6f3/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index cd2bf9a..480bac8 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -18,24 +18,14 @@
 package org.apache.spark
 
 import network.ConnectionManagerId
-import org.scalatest.FunSuite
 import org.scalatest.BeforeAndAfter
 import org.scalatest.concurrent.Timeouts._
+import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
-import org.scalatest.prop.Checkers
 import org.scalatest.time.{Span, Millis}
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
-import org.eclipse.jetty.server.{Server, Request, Handler}
-
-import com.google.common.io.Files
-
-import scala.collection.mutable.ArrayBuffer
 
 import SparkContext._
-import storage.{GetBlock, BlockManagerWorker, StorageLevel}
-import ui.JettyUtils
+import org.apache.spark.storage.{BlockManagerWorker, GetBlock, RDDBlockId, 
StorageLevel}
 
 
 class NotSerializableClass
@@ -193,7 +183,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers 
with BeforeAndAfter
 
     // Get all the locations of the first partition and try to fetch the 
partitions
     // from those locations.
-    val blockIds = data.partitions.indices.map(index => 
"rdd_%d_%d".format(data.id, index)).toArray
+    val blockIds = data.partitions.indices.map(index => RDDBlockId(data.id, 
index)).toArray
     val blockId = blockIds(0)
     val blockManager = SparkEnv.get.blockManager
     blockManager.master.getLocations(blockId).foreach(id => {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/616ea6f3/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2f93324..3952ee9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.Partition
 import org.apache.spark.TaskContext
 import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency}
 import org.apache.spark.{FetchFailed, Success, TaskEndReason}
-import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster}
+import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
 
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 
@@ -75,15 +75,10 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
   val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
   // stub out BlockManagerMaster.getLocations to use our cacheLocations
   val blockManagerMaster = new BlockManagerMaster(null) {
-      override def getLocations(blockIds: Array[String]): 
Seq[Seq[BlockManagerId]] = {
-        blockIds.map { name =>
-          val pieces = name.split("_")
-          if (pieces(0) == "rdd") {
-            val key = pieces(1).toInt -> pieces(2).toInt
-            cacheLocations.getOrElse(key, Seq())
-          } else {
-            Seq()
-          }
+      override def getLocations(blockIds: Array[BlockId]): 
Seq[Seq[BlockManagerId]] = {
+        blockIds.map {
+          _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => 
cacheLocations.get(key)).
+            getOrElse(Seq())
         }.toSeq
       }
       override def removeExecutor(execId: String) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/616ea6f3/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
index 119ba30..ee150a3 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, 
FunSuite}
 
 import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
 import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, 
TaskResult}
+import org.apache.spark.storage.TaskResultBlockId
 
 /**
  * Removes the TaskResult from the BlockManager before delegating to a normal 
TaskResultGetter.
@@ -85,7 +86,7 @@ class TaskResultGetterSuite extends FunSuite with 
BeforeAndAfter with BeforeAndA
     val result = sc.parallelize(Seq(1), 1).map(x => 
1.to(akkaFrameSize).toArray).reduce((x, y) => x)
     assert(result === 1.to(akkaFrameSize).toArray)
 
-    val RESULT_BLOCK_ID = "taskresult_0"
+    val RESULT_BLOCK_ID = TaskResultBlockId(0)
     assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0,
       "Expect result to be removed from the block manager.")
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/616ea6f3/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
new file mode 100644
index 0000000..cb76275
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+
+class BlockIdSuite extends FunSuite {
+  def assertSame(id1: BlockId, id2: BlockId) {
+    assert(id1.name === id2.name)
+    assert(id1.hashCode === id2.hashCode)
+    assert(id1 === id2)
+  }
+
+  def assertDifferent(id1: BlockId, id2: BlockId) {
+    assert(id1.name != id2.name)
+    assert(id1.hashCode != id2.hashCode)
+    assert(id1 != id2)
+  }
+
+  test("test-bad-deserialization") {
+    try {
+      // Try to deserialize an invalid block id.
+      BlockId("myblock")
+      fail()
+    } catch {
+      case e: IllegalStateException => // OK
+      case _ => fail()
+    }
+  }
+
+  test("rdd") {
+    val id = RDDBlockId(1, 2)
+    assertSame(id, RDDBlockId(1, 2))
+    assertDifferent(id, RDDBlockId(1, 1))
+    assert(id.name === "rdd_1_2")
+    assert(id.asRDDId.get.rddId === 1)
+    assert(id.asRDDId.get.splitIndex === 2)
+    assert(id.isRDD)
+    assertSame(id, BlockId(id.toString))
+  }
+
+  test("shuffle") {
+    val id = ShuffleBlockId(1, 2, 3)
+    assertSame(id, ShuffleBlockId(1, 2, 3))
+    assertDifferent(id, ShuffleBlockId(3, 2, 3))
+    assert(id.name === "shuffle_1_2_3")
+    assert(id.asRDDId === None)
+    assert(id.shuffleId === 1)
+    assert(id.mapId === 2)
+    assert(id.reduceId === 3)
+    assert(id.isShuffle)
+    assertSame(id, BlockId(id.toString))
+  }
+
+  test("broadcast") {
+    val id = BroadcastBlockId(42)
+    assertSame(id, BroadcastBlockId(42))
+    assertDifferent(id, BroadcastBlockId(123))
+    assert(id.name === "broadcast_42")
+    assert(id.asRDDId === None)
+    assert(id.broadcastId === 42)
+    assert(id.isBroadcast)
+    assertSame(id, BlockId(id.toString))
+  }
+
+  test("taskresult") {
+    val id = TaskResultBlockId(60)
+    assertSame(id, TaskResultBlockId(60))
+    assertDifferent(id, TaskResultBlockId(61))
+    assert(id.name === "taskresult_60")
+    assert(id.asRDDId === None)
+    assert(id.taskId === 60)
+    assert(!id.isRDD)
+    assertSame(id, BlockId(id.toString))
+  }
+
+  test("stream") {
+    val id = StreamBlockId(1, 100)
+    assertSame(id, StreamBlockId(1, 100))
+    assertDifferent(id, StreamBlockId(2, 101))
+    assert(id.name === "input-1-100")
+    assert(id.asRDDId === None)
+    assert(id.streamId === 1)
+    assert(id.uniqueId === 100)
+    assert(!id.isBroadcast)
+    assertSame(id, BlockId(id.toString))
+  }
+
+  test("test") {
+    val id = TestBlockId("abc")
+    assertSame(id, TestBlockId("abc"))
+    assertDifferent(id, TestBlockId("ab"))
+    assert(id.name === "test_abc")
+    assert(id.asRDDId === None)
+    assert(id.id === "abc")
+    assert(!id.isShuffle)
+    assertSame(id, BlockId(id.toString))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/616ea6f3/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 038a9ac..484a654 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -32,7 +32,6 @@ import org.scalatest.time.SpanSugar._
 import org.apache.spark.util.{SizeEstimator, Utils, AkkaUtils, 
ByteBufferInputStream}
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 
-
 class BlockManagerSuite extends FunSuite with BeforeAndAfter with 
PrivateMethodTester {
   var store: BlockManager = null
   var store2: BlockManager = null
@@ -46,6 +45,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter 
with PrivateMethodT
   System.setProperty("spark.kryoserializer.buffer.mb", "1")
   val serializer = new KryoSerializer
 
+  // Implicitly convert strings to BlockIds for test clarity.
+  implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
+  def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
+
   before {
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", 
"localhost", 0)
     this.actorSystem = actorSystem
@@ -229,31 +232,31 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
     // Putting a1, a2 and a3 in memory.
-    store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
     store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = false)
 
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle("rdd_0_0") should be (None)
-      master.getLocations("rdd_0_0") should have size 0
+      store.getSingle(rdd(0, 0)) should be (None)
+      master.getLocations(rdd(0, 0)) should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
-      store.getSingle("rdd_0_1") should be (None)
-      master.getLocations("rdd_0_1") should have size 0
+      store.getSingle(rdd(0, 1)) should be (None)
+      master.getLocations(rdd(0, 1)) should have size 0
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
       store.getSingle("nonrddblock") should not be (None)
       master.getLocations("nonrddblock") should have size (1)
     }
 
-    store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = true)
-    store.getSingle("rdd_0_0") should be (None)
-    master.getLocations("rdd_0_0") should have size 0
-    store.getSingle("rdd_0_1") should be (None)
-    master.getLocations("rdd_0_1") should have size 0
+    store.getSingle(rdd(0, 0)) should be (None)
+    master.getLocations(rdd(0, 0)) should have size 0
+    store.getSingle(rdd(0, 1)) should be (None)
+    master.getLocations(rdd(0, 1)) should have size 0
   }
 
   test("reregistration on heart beat") {
@@ -372,41 +375,41 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
-    store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY)
-    store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
     // Even though we accessed rdd_0_3 last, it should not have replaced 
partitions 1 and 2
     // from the same RDD
-    assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
-    assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
-    assert(store.getSingle("rdd_0_1") != None, "rdd_0_1 was not in store")
+    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
+    assert(store.getSingle(rdd(0, 2)) != None, "rdd_0_2 was not in store")
+    assert(store.getSingle(rdd(0, 1)) != None, "rdd_0_1 was not in store")
     // Check that rdd_0_3 doesn't replace them even after further accesses
-    assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
-    assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
-    assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
+    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
+    assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
   }
 
   test("in-memory LRU for partitions of multiple RDDs") {
     store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
-    store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
     // At this point rdd_1_1 should've replaced rdd_0_1
-    assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store")
-    assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
-    assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store")
+    assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
+    assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
+    assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store")
     // Do a get() on rdd_0_2 so that it is the most recently used item
-    assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
+    assert(store.getSingle(rdd(0, 2)) != None, "rdd_0_2 was not in store")
     // Put in more partitions from RDD 0; they should replace rdd_1_1
-    store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 3), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 4), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
     // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should 
*not* be dropped
     // when we try to add rdd_0_4.
-    assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store")
-    assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
-    assert(!store.memoryStore.contains("rdd_0_4"), "rdd_0_4 was in store")
-    assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store")
-    assert(store.memoryStore.contains("rdd_0_3"), "rdd_0_3 was not in store")
+    assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
+    assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
+    assert(!store.memoryStore.contains(rdd(0, 4)), "rdd_0_4 was in store")
+    assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store")
+    assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store")
   }
 
   test("on-disk storage") {
@@ -590,43 +593,46 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
     try {
       System.setProperty("spark.shuffle.compress", "true")
       store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
-      store.putSingle("shuffle_0_0_0", new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 
was not compressed")
+      store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
+        "shuffle_0_0_0 was not compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.shuffle.compress", "false")
       store = new BlockManager("exec2", actorSystem, master, serializer, 2000)
-      store.putSingle("shuffle_0_0_0", new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, 
"shuffle_0_0_0 was compressed")
+      store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000,
+        "shuffle_0_0_0 was compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.broadcast.compress", "true")
       store = new BlockManager("exec3", actorSystem, master, serializer, 2000)
-      store.putSingle("broadcast_0", new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was 
not compressed")
+      store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100,
+        "broadcast_0 was not compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.broadcast.compress", "false")
       store = new BlockManager("exec4", actorSystem, master, serializer, 2000)
-      store.putSingle("broadcast_0", new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 
was compressed")
+      store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, 
"broadcast_0 was compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.rdd.compress", "true")
       store = new BlockManager("exec5", actorSystem, master, serializer, 2000)
-      store.putSingle("rdd_0_0", new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not 
compressed")
+      store.putSingle(rdd(0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not 
compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.rdd.compress", "false")
       store = new BlockManager("exec6", actorSystem, master, serializer, 2000)
-      store.putSingle("rdd_0_0", new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was 
compressed")
+      store.putSingle(rdd(0, 0), new Array[Byte](1000), 
StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was 
compressed")
       store.stop()
       store = null
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/616ea6f3/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
index aae79a4..b97fb7e 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
@@ -30,10 +30,11 @@ import akka.actor._
 import akka.pattern.ask
 import akka.util.duration._
 import akka.dispatch._
+import org.apache.spark.storage.BlockId
 
 private[streaming] sealed trait NetworkInputTrackerMessage
 private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: 
ActorRef) extends NetworkInputTrackerMessage
-private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], 
metadata: Any) extends NetworkInputTrackerMessage
+private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], 
metadata: Any) extends NetworkInputTrackerMessage
 private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) 
extends NetworkInputTrackerMessage
 
 /**
@@ -48,7 +49,7 @@ class NetworkInputTracker(
   val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*)
   val receiverExecutor = new ReceiverExecutor()
   val receiverInfo = new HashMap[Int, ActorRef]
-  val receivedBlockIds = new HashMap[Int, Queue[String]]
+  val receivedBlockIds = new HashMap[Int, Queue[BlockId]]
   val timeout = 5000.milliseconds
 
   var currentTime: Time = null
@@ -67,9 +68,9 @@ class NetworkInputTracker(
   }
 
   /** Return all the blocks received from a receiver. */
-  def getBlockIds(receiverId: Int, time: Time): Array[String] = synchronized {
+  def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized {
     val queue =  receivedBlockIds.synchronized {
-      receivedBlockIds.getOrElse(receiverId, new Queue[String]())
+      receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]())
     }
     val result = queue.synchronized {
       queue.dequeueAll(x => true)
@@ -92,7 +93,7 @@ class NetworkInputTracker(
       case AddBlocks(streamId, blockIds, metadata) => {
         val tmp = receivedBlockIds.synchronized {
           if (!receivedBlockIds.contains(streamId)) {
-            receivedBlockIds += ((streamId, new Queue[String]))
+            receivedBlockIds += ((streamId, new Queue[BlockId]))
           }
           receivedBlockIds(streamId)
         }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/616ea6f3/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 31f9891..8d3ac0f 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -31,7 +31,7 @@ import org.apache.spark.streaming.util.{RecurringTimer, 
SystemClock}
 import org.apache.spark.streaming._
 import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.rdd.{RDD, BlockRDD}
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
 
 /**
  * Abstract class for defining any InputDStream that has to start a receiver 
on worker
@@ -69,7 +69,7 @@ abstract class NetworkInputDStream[T: 
ClassManifest](@transient ssc_ : Streaming
       val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
       Some(new BlockRDD[T](ssc.sc, blockIds))
     } else {
-      Some(new BlockRDD[T](ssc.sc, Array[String]()))
+      Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
     }
   }
 }
@@ -77,7 +77,7 @@ abstract class NetworkInputDStream[T: 
ClassManifest](@transient ssc_ : Streaming
 
 private[streaming] sealed trait NetworkReceiverMessage
 private[streaming] case class StopReceiver(msg: String) extends 
NetworkReceiverMessage
-private[streaming] case class ReportBlock(blockId: String, metadata: Any) 
extends NetworkReceiverMessage
+private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) 
extends NetworkReceiverMessage
 private[streaming] case class ReportError(msg: String) extends 
NetworkReceiverMessage
 
 /**
@@ -158,7 +158,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends 
Serializable with Log
   /**
    * Pushes a block (as an ArrayBuffer filled with data) into the block 
manager.
    */
-  def pushBlock(blockId: String, arrayBuffer: ArrayBuffer[T], metadata: Any, 
level: StorageLevel) {
+  def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, 
level: StorageLevel) {
     env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], 
level)
     actor ! ReportBlock(blockId, metadata)
   }
@@ -166,7 +166,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends 
Serializable with Log
   /**
    * Pushes a block (as bytes) into the block manager.
    */
-  def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: 
StorageLevel) {
+  def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: 
StorageLevel) {
     env.blockManager.putBytes(blockId, bytes, level)
     actor ! ReportBlock(blockId, metadata)
   }
@@ -209,7 +209,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends 
Serializable with Log
   class BlockGenerator(storageLevel: StorageLevel)
     extends Serializable with Logging {
 
-    case class Block(id: String, buffer: ArrayBuffer[T], metadata: Any = null)
+    case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
 
     val clock = new SystemClock()
     val blockInterval = System.getProperty("spark.streaming.blockInterval", 
"200").toLong
@@ -241,7 +241,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends 
Serializable with Log
         val newBlockBuffer = currentBuffer
         currentBuffer = new ArrayBuffer[T]
         if (newBlockBuffer.size > 0) {
-          val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time 
- blockInterval)
+          val blockId = StreamBlockId(NetworkReceiver.this.streamId, time - 
blockInterval)
           val newBlock = new Block(blockId, newBlockBuffer)
           blocksForPushing.add(newBlock)
         }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/616ea6f3/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
index c91f12e..10ed4ef 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.dstream
 
 import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
 import org.apache.spark.streaming.StreamingContext
 
 import java.net.InetSocketAddress
@@ -71,7 +71,7 @@ class RawNetworkReceiver(host: String, port: Int, 
storageLevel: StorageLevel)
         var nextBlockNumber = 0
         while (true) {
           val buffer = queue.take()
-          val blockId = "input-" + streamId + "-" + nextBlockNumber
+          val blockId = StreamBlockId(streamId, nextBlockNumber)
           nextBlockNumber += 1
           pushBlock(blockId, buffer, null, storageLevel)
         }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/616ea6f3/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
index 4b5d8c4..ef0f85a 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
@@ -21,7 +21,7 @@ import akka.actor.{ Actor, PoisonPill, Props, 
SupervisorStrategy }
 import akka.actor.{ actorRef2Scala, ActorRef }
 import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
 
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
 import org.apache.spark.streaming.dstream.NetworkReceiver
 
 import java.util.concurrent.atomic.AtomicInteger
@@ -159,7 +159,7 @@ private[streaming] class ActorReceiver[T: ClassManifest](
   protected def pushBlock(iter: Iterator[T]) {
     val buffer = new ArrayBuffer[T]
     buffer ++= iter
-    pushBlock("block-" + streamId + "-" + System.nanoTime(), buffer, null, 
storageLevel)
+    pushBlock(StreamBlockId(streamId, System.nanoTime()), buffer, null, 
storageLevel)
   }
 
   protected def onStart() = {

Reply via email to