Updated Branches: refs/heads/master 99796904a -> 3b11f43e3
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a3959111/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 2bb7715..8358596 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -23,20 +23,24 @@ import org.apache.spark.util.Utils private[spark] case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, - blocks: Map[String, BlockStatus]) { + blocks: Map[BlockId, BlockStatus]) { - def memUsed(blockPrefix: String = "") = { - blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.memSize). - reduceOption(_+_).getOrElse(0l) - } + def memUsed() = blocks.values.map(_.memSize).reduceOption(_+_).getOrElse(0l) - def diskUsed(blockPrefix: String = "") = { - blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.diskSize). - reduceOption(_+_).getOrElse(0l) - } + def memUsedByRDD(rddId: Int) = + rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_+_).getOrElse(0l) + + def diskUsed() = blocks.values.map(_.diskSize).reduceOption(_+_).getOrElse(0l) + + def diskUsedByRDD(rddId: Int) = + rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_+_).getOrElse(0l) def memRemaining : Long = maxMem - memUsed() + def rddBlocks = blocks.flatMap { + case (rdd: RDDBlockId, status) => Some(rdd, status) + case _ => None + } } case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, @@ -60,7 +64,7 @@ object StorageUtils { /* Returns RDD-level information, compiled from a list of StorageStatus objects */ def rddInfoFromStorageStatus(storageStatusList: Seq[StorageStatus], sc: SparkContext) : Array[RDDInfo] = { - rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc) + rddInfoFromBlockStatusList(storageStatusList.flatMap(_.rddBlocks).toMap[RDDBlockId, BlockStatus], sc) } /* Returns a map of blocks to their locations, compiled from a list of StorageStatus objects */ @@ -71,26 +75,21 @@ object StorageUtils { } /* Given a list of BlockStatus objets, returns information for each RDD */ - def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus], + def rddInfoFromBlockStatusList(infos: Map[RDDBlockId, BlockStatus], sc: SparkContext) : Array[RDDInfo] = { // Group by rddId, ignore the partition name - val groupedRddBlocks = infos.filterKeys(_.startsWith("rdd_")).groupBy { case(k, v) => - k.substring(0,k.lastIndexOf('_')) - }.mapValues(_.values.toArray) + val groupedRddBlocks = infos.groupBy { case(k, v) => k.rddId }.mapValues(_.values.toArray) // For each RDD, generate an RDDInfo object - val rddInfos = groupedRddBlocks.map { case (rddKey, rddBlocks) => + val rddInfos = groupedRddBlocks.map { case (rddId, rddBlocks) => // Add up memory and disk sizes val memSize = rddBlocks.map(_.memSize).reduce(_ + _) val diskSize = rddBlocks.map(_.diskSize).reduce(_ + _) - // Find the id of the RDD, e.g. rdd_1 => 1 - val rddId = rddKey.split("_").last.toInt - // Get the friendly name and storage level for the RDD, if available sc.persistentRdds.get(rddId).map { r => - val rddName = Option(r.name).getOrElse(rddKey) + val rddName = Option(r.name).getOrElse(rddId.toString) val rddStorageLevel = r.getStorageLevel RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size, memSize, diskSize) } @@ -101,16 +100,14 @@ object StorageUtils { rddInfos } - /* Removes all BlockStatus object that are not part of a block prefix */ - def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus], - prefix: String) : Array[StorageStatus] = { + /* Filters storage status by a given RDD id. */ + def filterStorageStatusByRDD(storageStatusList: Array[StorageStatus], rddId: Int) + : Array[StorageStatus] = { storageStatusList.map { status => - val newBlocks = status.blocks.filterKeys(_.startsWith(prefix)) + val newBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toMap[BlockId, BlockStatus] //val newRemainingMem = status.maxMem - newBlocks.values.map(_.memSize).reduce(_ + _) StorageStatus(status.blockManagerId, status.maxMem, newBlocks) } - } - } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a3959111/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index f2ae8dd..860e680 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -36,11 +36,11 @@ private[spark] object ThreadingTest { val numBlocksPerProducer = 20000 private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread { - val queue = new ArrayBlockingQueue[(String, Seq[Int])](100) + val queue = new ArrayBlockingQueue[(BlockId, Seq[Int])](100) override def run() { for (i <- 1 to numBlocksPerProducer) { - val blockId = "b-" + id + "-" + i + val blockId = TestBlockId("b-" + id + "-" + i) val blockSize = Random.nextInt(1000) val block = (1 to blockSize).map(_ => Random.nextInt()) val level = randomLevel() @@ -64,7 +64,7 @@ private[spark] object ThreadingTest { private[spark] class ConsumerThread( manager: BlockManager, - queue: ArrayBlockingQueue[(String, Seq[Int])] + queue: ArrayBlockingQueue[(BlockId, Seq[Int])] ) extends Thread { var numBlockConsumed = 0 http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a3959111/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 43c1257..74e5deb 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.storage.{StorageStatus, StorageUtils} +import org.apache.spark.storage.{BlockId, StorageStatus, StorageUtils} import org.apache.spark.storage.BlockManagerMasterActor.BlockStatus import org.apache.spark.ui.UIUtils._ import org.apache.spark.ui.Page._ @@ -33,21 +33,20 @@ private[spark] class RDDPage(parent: BlockManagerUI) { val sc = parent.sc def render(request: HttpServletRequest): Seq[Node] = { - val id = request.getParameter("id") - val prefix = "rdd_" + id.toString + val id = request.getParameter("id").toInt val storageStatusList = sc.getExecutorStorageStatus - val filteredStorageStatusList = StorageUtils. - filterStorageStatusByPrefix(storageStatusList, prefix) + val filteredStorageStatusList = StorageUtils.filterStorageStatusByRDD(storageStatusList, id) val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage") - val workers = filteredStorageStatusList.map((prefix, _)) + val workers = filteredStorageStatusList.map((id, _)) val workerTable = listingTable(workerHeaders, workerRow, workers) val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk", "Executors") - val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1) + val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray. + sortWith(_._1.filename < _._1.filename) val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList) val blocks = blockStatuses.map { case(id, status) => (id, status, blockLocations.get(id).getOrElse(Seq("UNKNOWN"))) @@ -99,7 +98,7 @@ private[spark] class RDDPage(parent: BlockManagerUI) { headerSparkPage(content, parent.sc, "RDD Storage Info for " + rddInfo.name, Storage) } - def blockRow(row: (String, BlockStatus, Seq[String])): Seq[Node] = { + def blockRow(row: (BlockId, BlockStatus, Seq[String])): Seq[Node] = { val (id, block, locations) = row <tr> <td>{id}</td> @@ -118,15 +117,15 @@ private[spark] class RDDPage(parent: BlockManagerUI) { </tr> } - def workerRow(worker: (String, StorageStatus)): Seq[Node] = { - val (prefix, status) = worker + def workerRow(worker: (Int, StorageStatus)): Seq[Node] = { + val (rddId, status) = worker <tr> <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td> <td> - {Utils.bytesToString(status.memUsed(prefix))} + {Utils.bytesToString(status.memUsedByRDD(rddId))} ({Utils.bytesToString(status.memRemaining)} Remaining) </td> - <td>{Utils.bytesToString(status.diskUsed(prefix))}</td> + <td>{Utils.bytesToString(status.diskUsedByRDD(rddId))}</td> </tr> } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a3959111/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 3a7171c..ced036c 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -23,7 +23,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.mock.EasyMockSugar import org.apache.spark.rdd.RDD -import org.apache.spark.storage.{BlockManager, StorageLevel} +import org.apache.spark.storage.{BlockManager, RDDBlockId, StorageLevel} // TODO: Test the CacheManager's thread-safety aspects class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { @@ -52,9 +52,9 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar test("get uncached rdd") { expecting { - blockManager.get("rdd_0_0").andReturn(None) - blockManager.put("rdd_0_0", ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY, true). - andReturn(0) + blockManager.get(RDDBlockId(0, 0)).andReturn(None) + blockManager.put(RDDBlockId(0, 0), ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY, + true).andReturn(0) } whenExecuting(blockManager) { @@ -66,7 +66,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar test("get cached rdd") { expecting { - blockManager.get("rdd_0_0").andReturn(Some(ArrayBuffer(5, 6, 7).iterator)) + blockManager.get(RDDBlockId(0, 0)).andReturn(Some(ArrayBuffer(5, 6, 7).iterator)) } whenExecuting(blockManager) { @@ -79,7 +79,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar test("get uncached local rdd") { expecting { // Local computation should not persist the resulting value, so don't expect a put(). - blockManager.get("rdd_0_0").andReturn(None) + blockManager.get(RDDBlockId(0, 0)).andReturn(None) } whenExecuting(blockManager) { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a3959111/core/src/test/scala/org/apache/spark/CheckpointSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index d9103ae..7ca5f16 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.FunSuite import java.io.File import org.apache.spark.rdd._ import org.apache.spark.SparkContext._ -import storage.StorageLevel +import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId} import org.apache.spark.util.Utils class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { @@ -83,7 +83,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } test("BlockRDD") { - val blockId = "id" + val blockId = TestBlockId("id") val blockManager = SparkEnv.get.blockManager blockManager.putSingle(blockId, "test", StorageLevel.MEMORY_ONLY) val blockRDD = new BlockRDD[String](sc, Array(blockId)) @@ -191,7 +191,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } test("CheckpointRDD with zero partitions") { - val rdd = new BlockRDD[Int](sc, Array[String]()) + val rdd = new BlockRDD[Int](sc, Array[BlockId]()) assert(rdd.partitions.size === 0) assert(rdd.isCheckpointed === false) rdd.checkpoint() http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a3959111/core/src/test/scala/org/apache/spark/DistributedSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index cd2bf9a..480bac8 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -18,24 +18,14 @@ package org.apache.spark import network.ConnectionManagerId -import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.Timeouts._ +import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers -import org.scalatest.prop.Checkers import org.scalatest.time.{Span, Millis} -import org.scalacheck.Arbitrary._ -import org.scalacheck.Gen -import org.scalacheck.Prop._ -import org.eclipse.jetty.server.{Server, Request, Handler} - -import com.google.common.io.Files - -import scala.collection.mutable.ArrayBuffer import SparkContext._ -import storage.{GetBlock, BlockManagerWorker, StorageLevel} -import ui.JettyUtils +import org.apache.spark.storage.{BlockManagerWorker, GetBlock, RDDBlockId, StorageLevel} class NotSerializableClass @@ -193,7 +183,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter // Get all the locations of the first partition and try to fetch the partitions // from those locations. - val blockIds = data.partitions.indices.map(index => "rdd_%d_%d".format(data.id, index)).toArray + val blockIds = data.partitions.indices.map(index => RDDBlockId(data.id, index)).toArray val blockId = blockIds(0) val blockManager = SparkEnv.get.blockManager blockManager.master.getLocations(blockId).foreach(id => { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a3959111/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2f93324..3952ee9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.Partition import org.apache.spark.TaskContext import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency} import org.apache.spark.{FetchFailed, Success, TaskEndReason} -import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster} +import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode @@ -75,15 +75,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] // stub out BlockManagerMaster.getLocations to use our cacheLocations val blockManagerMaster = new BlockManagerMaster(null) { - override def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = { - blockIds.map { name => - val pieces = name.split("_") - if (pieces(0) == "rdd") { - val key = pieces(1).toInt -> pieces(2).toInt - cacheLocations.getOrElse(key, Seq()) - } else { - Seq() - } + override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = { + blockIds.map { + _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)). + getOrElse(Seq()) }.toSeq } override def removeExecutor(execId: String) { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a3959111/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala index 119ba30..ee150a3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala @@ -23,6 +23,7 @@ import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv} import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult} +import org.apache.spark.storage.TaskResultBlockId /** * Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter. @@ -85,7 +86,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) assert(result === 1.to(akkaFrameSize).toArray) - val RESULT_BLOCK_ID = "taskresult_0" + val RESULT_BLOCK_ID = TaskResultBlockId(0) assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0, "Expect result to be removed from the block manager.") } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a3959111/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 038a9ac..484a654 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -32,7 +32,6 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.util.{SizeEstimator, Utils, AkkaUtils, ByteBufferInputStream} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} - class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { var store: BlockManager = null var store2: BlockManager = null @@ -46,6 +45,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT System.setProperty("spark.kryoserializer.buffer.mb", "1") val serializer = new KryoSerializer + // Implicitly convert strings to BlockIds for test clarity. + implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) + def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId) + before { val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0) this.actorSystem = actorSystem @@ -229,31 +232,31 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) // Putting a1, a2 and a3 in memory. - store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY) - store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY) store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY) master.removeRdd(0, blocking = false) eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - store.getSingle("rdd_0_0") should be (None) - master.getLocations("rdd_0_0") should have size 0 + store.getSingle(rdd(0, 0)) should be (None) + master.getLocations(rdd(0, 0)) should have size 0 } eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { - store.getSingle("rdd_0_1") should be (None) - master.getLocations("rdd_0_1") should have size 0 + store.getSingle(rdd(0, 1)) should be (None) + master.getLocations(rdd(0, 1)) should have size 0 } eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { store.getSingle("nonrddblock") should not be (None) master.getLocations("nonrddblock") should have size (1) } - store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY) - store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY) master.removeRdd(0, blocking = true) - store.getSingle("rdd_0_0") should be (None) - master.getLocations("rdd_0_0") should have size 0 - store.getSingle("rdd_0_1") should be (None) - master.getLocations("rdd_0_1") should have size 0 + store.getSingle(rdd(0, 0)) should be (None) + master.getLocations(rdd(0, 0)) should have size 0 + store.getSingle(rdd(0, 1)) should be (None) + master.getLocations(rdd(0, 1)) should have size 0 } test("reregistration on heart beat") { @@ -372,41 +375,41 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) - store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY) - store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY) - store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY) // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2 // from the same RDD - assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") - assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store") - assert(store.getSingle("rdd_0_1") != None, "rdd_0_1 was not in store") + assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store") + assert(store.getSingle(rdd(0, 2)) != None, "rdd_0_2 was not in store") + assert(store.getSingle(rdd(0, 1)) != None, "rdd_0_1 was not in store") // Check that rdd_0_3 doesn't replace them even after further accesses - assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") - assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") - assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store") + assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store") + assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store") } test("in-memory LRU for partitions of multiple RDDs") { store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) - store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) - store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY) - store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY) // At this point rdd_1_1 should've replaced rdd_0_1 - assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store") - assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store") - assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store") + assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store") + assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store") + assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store") // Do a get() on rdd_0_2 so that it is the most recently used item - assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store") + assert(store.getSingle(rdd(0, 2)) != None, "rdd_0_2 was not in store") // Put in more partitions from RDD 0; they should replace rdd_1_1 - store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY) - store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 3), new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle(rdd(0, 4), new Array[Byte](400), StorageLevel.MEMORY_ONLY) // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped // when we try to add rdd_0_4. - assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store") - assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store") - assert(!store.memoryStore.contains("rdd_0_4"), "rdd_0_4 was in store") - assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store") - assert(store.memoryStore.contains("rdd_0_3"), "rdd_0_3 was not in store") + assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store") + assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store") + assert(!store.memoryStore.contains(rdd(0, 4)), "rdd_0_4 was in store") + assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store") + assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store") } test("on-disk storage") { @@ -590,43 +593,46 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT try { System.setProperty("spark.shuffle.compress", "true") store = new BlockManager("exec1", actorSystem, master, serializer, 2000) - store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) - assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed") + store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100, + "shuffle_0_0_0 was not compressed") store.stop() store = null System.setProperty("spark.shuffle.compress", "false") store = new BlockManager("exec2", actorSystem, master, serializer, 2000) - store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) - assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed") + store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000, + "shuffle_0_0_0 was compressed") store.stop() store = null System.setProperty("spark.broadcast.compress", "true") store = new BlockManager("exec3", actorSystem, master, serializer, 2000) - store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) - assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed") + store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100, + "broadcast_0 was not compressed") store.stop() store = null System.setProperty("spark.broadcast.compress", "false") store = new BlockManager("exec4", actorSystem, master, serializer, 2000) - store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) - assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed") + store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, "broadcast_0 was compressed") store.stop() store = null System.setProperty("spark.rdd.compress", "true") store = new BlockManager("exec5", actorSystem, master, serializer, 2000) - store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) - assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed") + store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not compressed") store.stop() store = null System.setProperty("spark.rdd.compress", "false") store = new BlockManager("exec6", actorSystem, master, serializer, 2000) - store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) - assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed") + store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was compressed") store.stop() store = null http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a3959111/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala index aae79a4..b97fb7e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala @@ -30,10 +30,11 @@ import akka.actor._ import akka.pattern.ask import akka.util.duration._ import akka.dispatch._ +import org.apache.spark.storage.BlockId private[streaming] sealed trait NetworkInputTrackerMessage private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage -private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage +private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) extends NetworkInputTrackerMessage private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage /** @@ -48,7 +49,7 @@ class NetworkInputTracker( val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*) val receiverExecutor = new ReceiverExecutor() val receiverInfo = new HashMap[Int, ActorRef] - val receivedBlockIds = new HashMap[Int, Queue[String]] + val receivedBlockIds = new HashMap[Int, Queue[BlockId]] val timeout = 5000.milliseconds var currentTime: Time = null @@ -67,9 +68,9 @@ class NetworkInputTracker( } /** Return all the blocks received from a receiver. */ - def getBlockIds(receiverId: Int, time: Time): Array[String] = synchronized { + def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized { val queue = receivedBlockIds.synchronized { - receivedBlockIds.getOrElse(receiverId, new Queue[String]()) + receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]()) } val result = queue.synchronized { queue.dequeueAll(x => true) @@ -92,7 +93,7 @@ class NetworkInputTracker( case AddBlocks(streamId, blockIds, metadata) => { val tmp = receivedBlockIds.synchronized { if (!receivedBlockIds.contains(streamId)) { - receivedBlockIds += ((streamId, new Queue[String])) + receivedBlockIds += ((streamId, new Queue[BlockId])) } receivedBlockIds(streamId) } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a3959111/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 31f9891..8d3ac0f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -31,7 +31,7 @@ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} import org.apache.spark.streaming._ import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.rdd.{RDD, BlockRDD} -import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} /** * Abstract class for defining any InputDStream that has to start a receiver on worker @@ -69,7 +69,7 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) Some(new BlockRDD[T](ssc.sc, blockIds)) } else { - Some(new BlockRDD[T](ssc.sc, Array[String]())) + Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) } } } @@ -77,7 +77,7 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming private[streaming] sealed trait NetworkReceiverMessage private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage -private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage +private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) extends NetworkReceiverMessage private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage /** @@ -158,7 +158,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log /** * Pushes a block (as an ArrayBuffer filled with data) into the block manager. */ - def pushBlock(blockId: String, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { + def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level) actor ! ReportBlock(blockId, metadata) } @@ -166,7 +166,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log /** * Pushes a block (as bytes) into the block manager. */ - def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { + def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { env.blockManager.putBytes(blockId, bytes, level) actor ! ReportBlock(blockId, metadata) } @@ -209,7 +209,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log class BlockGenerator(storageLevel: StorageLevel) extends Serializable with Logging { - case class Block(id: String, buffer: ArrayBuffer[T], metadata: Any = null) + case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null) val clock = new SystemClock() val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong @@ -241,7 +241,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[T] if (newBlockBuffer.size > 0) { - val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval) + val blockId = StreamBlockId(NetworkReceiver.this.streamId, time - blockInterval) val newBlock = new Block(blockId, newBlockBuffer) blocksForPushing.add(newBlock) } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a3959111/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala index c91f12e..10ed4ef 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.dstream import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.StreamingContext import java.net.InetSocketAddress @@ -71,7 +71,7 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel) var nextBlockNumber = 0 while (true) { val buffer = queue.take() - val blockId = "input-" + streamId + "-" + nextBlockNumber + val blockId = StreamBlockId(streamId, nextBlockNumber) nextBlockNumber += 1 pushBlock(blockId, buffer, null, storageLevel) } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a3959111/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala index 4b5d8c4..ef0f85a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala @@ -21,7 +21,7 @@ import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy } import akka.actor.{ actorRef2Scala, ActorRef } import akka.actor.{ PossiblyHarmful, OneForOneStrategy } -import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.dstream.NetworkReceiver import java.util.concurrent.atomic.AtomicInteger @@ -159,7 +159,7 @@ private[streaming] class ActorReceiver[T: ClassManifest]( protected def pushBlock(iter: Iterator[T]) { val buffer = new ArrayBuffer[T] buffer ++= iter - pushBlock("block-" + streamId + "-" + System.nanoTime(), buffer, null, storageLevel) + pushBlock(StreamBlockId(streamId, System.nanoTime()), buffer, null, storageLevel) } protected def onStart() = {