Removed unused code. Changes to match Spark coding style. Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e6637504 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e6637504 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e6637504
Branch: refs/heads/master Commit: e6637504880a941f909ece188573ee0c4853b96b Parents: e96bd00 Author: Mosharaf Chowdhury <mosha...@cs.berkeley.edu> Authored: Thu Oct 17 00:19:50 2013 -0700 Committer: Mosharaf Chowdhury <mosha...@cs.berkeley.edu> Committed: Thu Oct 17 00:19:50 2013 -0700 ---------------------------------------------------------------------- .../spark/broadcast/TorrentBroadcast.scala | 22 ++++++++++---------- .../spark/storage/BlockManagerMasterActor.scala | 3 --- 2 files changed, 11 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e6637504/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 3341401..8c23584 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -179,21 +179,21 @@ extends Logging { initialized = false } - val BlockSize = System.getProperty("spark.broadcast.blockSize", "4096").toInt * 1024 + val BLOCK_SIZE = System.getProperty("spark.broadcast.blockSize", "4096").toInt * 1024 - def blockifyObject[IN](obj: IN): TorrentInfo = { - val byteArray = Utils.serialize[IN](obj) + def blockifyObject[T](obj: T): TorrentInfo = { + val byteArray = Utils.serialize[T](obj) val bais = new ByteArrayInputStream(byteArray) - var blockNum = (byteArray.length / BlockSize) - if (byteArray.length % BlockSize != 0) + var blockNum = (byteArray.length / BLOCK_SIZE) + if (byteArray.length % BLOCK_SIZE != 0) blockNum += 1 var retVal = new Array[TorrentBlock](blockNum) var blockID = 0 - for (i <- 0 until (byteArray.length, BlockSize)) { - val thisBlockSize = math.min(BlockSize, byteArray.length - i) + for (i <- 0 until (byteArray.length, BLOCK_SIZE)) { + val thisBlockSize = math.min(BLOCK_SIZE, byteArray.length - i) var tempByteArray = new Array[Byte](thisBlockSize) val hasRead = bais.read(tempByteArray, 0, thisBlockSize) @@ -208,15 +208,15 @@ extends Logging { return tInfo } - def unBlockifyObject[OUT](arrayOfBlocks: Array[TorrentBlock], + def unBlockifyObject[T](arrayOfBlocks: Array[TorrentBlock], totalBytes: Int, - totalBlocks: Int): OUT = { + totalBlocks: Int): T = { var retByteArray = new Array[Byte](totalBytes) for (i <- 0 until totalBlocks) { System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray, - i * BlockSize, arrayOfBlocks(i).byteArray.length) + i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length) } - Utils.deserialize[OUT](retByteArray, Thread.currentThread.getContextClassLoader) + Utils.deserialize[T](retByteArray, Thread.currentThread.getContextClassLoader) } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e6637504/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 8b2a812..f8cf14b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -227,9 +227,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { } private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { - /* if (id.executorId == "<driver>" && !isLocal) { - // Got a register message from the master node; don't register it - } else */ if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(manager) =>