http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a714142..d1bee3d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -20,6 +20,8 @@ package org.apache.spark.storage
 import java.io.{File, InputStream, OutputStream, BufferedOutputStream, 
ByteArrayOutputStream}
 import java.nio.{ByteBuffer, MappedByteBuffer}
 
+import scala.concurrent.ExecutionContext.Implicits.global
+
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
@@ -58,18 +60,14 @@ private[spark] class BlockManager(
     defaultSerializer: Serializer,
     maxMemory: Long,
     val conf: SparkConf,
-    securityManager: SecurityManager,
     mapOutputTracker: MapOutputTracker,
-    shuffleManager: ShuffleManager)
-  extends BlockDataProvider with Logging {
+    shuffleManager: ShuffleManager,
+    blockTransferService: BlockTransferService)
+  extends BlockDataManager with Logging {
 
-  private val port = conf.getInt("spark.blockManager.port", 0)
+  blockTransferService.init(this)
 
   val diskBlockManager = new DiskBlockManager(this, conf)
-  val connectionManager =
-    new ConnectionManager(port, conf, securityManager, "Connection manager for 
block manager")
-
-  implicit val futureExecContext = connectionManager.futureExecContext
 
   private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
 
@@ -89,11 +87,7 @@ private[spark] class BlockManager(
   }
 
   val blockManagerId = BlockManagerId(
-    executorId, connectionManager.id.host, connectionManager.id.port)
-
-  // Max megabytes of data to keep in flight per reducer (to avoid 
over-allocating memory
-  // for receiving shuffle outputs)
-  val maxBytesInFlight = conf.getLong("spark.reducer.maxMbInFlight", 48) * 
1024 * 1024
+    executorId, blockTransferService.hostName, blockTransferService.port)
 
   // Whether to compress broadcast variables that are stored
   private val compressBroadcast = conf.getBoolean("spark.broadcast.compress", 
true)
@@ -136,11 +130,11 @@ private[spark] class BlockManager(
       master: BlockManagerMaster,
       serializer: Serializer,
       conf: SparkConf,
-      securityManager: SecurityManager,
       mapOutputTracker: MapOutputTracker,
-      shuffleManager: ShuffleManager) = {
+      shuffleManager: ShuffleManager,
+      blockTransferService: BlockTransferService) = {
     this(execId, actorSystem, master, serializer, 
BlockManager.getMaxMemory(conf),
-      conf, securityManager, mapOutputTracker, shuffleManager)
+      conf, mapOutputTracker, shuffleManager, blockTransferService)
   }
 
   /**
@@ -149,7 +143,6 @@ private[spark] class BlockManager(
    */
   private def initialize(): Unit = {
     master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
-    BlockManagerWorker.startBlockManagerWorker(this)
   }
 
   /**
@@ -212,21 +205,34 @@ private[spark] class BlockManager(
     }
   }
 
-  override def getBlockData(blockId: String): Either[FileSegment, ByteBuffer] 
= {
+  /**
+   * Interface to get local block data.
+   *
+   * @return Some(buffer) if the block exists locally, and None if it doesn't.
+   */
+  override def getBlockData(blockId: String): Option[ManagedBuffer] = {
     val bid = BlockId(blockId)
     if (bid.isShuffle) {
-      
shuffleManager.shuffleBlockManager.getBlockData(bid.asInstanceOf[ShuffleBlockId])
+      
Some(shuffleManager.shuffleBlockManager.getBlockData(bid.asInstanceOf[ShuffleBlockId]))
     } else {
       val blockBytesOpt = doGetLocal(bid, asBlockResult = 
false).asInstanceOf[Option[ByteBuffer]]
       if (blockBytesOpt.isDefined) {
-        Right(blockBytesOpt.get)
+        val buffer = blockBytesOpt.get
+        Some(new NioByteBufferManagedBuffer(buffer))
       } else {
-        throw new BlockNotFoundException(blockId)
+        None
       }
     }
   }
 
   /**
+   * Put the block locally, using the given storage level.
+   */
+  override def putBlockData(blockId: String, data: ManagedBuffer, level: 
StorageLevel): Unit = {
+    putBytes(BlockId(blockId), data.nioByteBuffer(), level)
+  }
+
+  /**
    * Get the BlockStatus for the block identified by the given ID, if it 
exists.
    * NOTE: This is mainly for testing, and it doesn't fetch information from 
Tachyon.
    */
@@ -333,16 +339,10 @@ private[spark] class BlockManager(
    * shuffle blocks. It is safe to do so without a lock on block info since 
disk store
    * never deletes (recent) items.
    */
-  def getLocalShuffleFromDisk(
-      blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
-
-    val shuffleBlockManager = shuffleManager.shuffleBlockManager
-    val values = 
shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]).map(
-      bytes => this.dataDeserialize(blockId, bytes, serializer))
-
-    values.orElse {
-      throw new BlockException(blockId, s"Block $blockId not found on disk, 
though it should be")
-    }
+  def getLocalShuffleFromDisk(blockId: BlockId, serializer: Serializer): 
Option[Iterator[Any]] = {
+    val buf = 
shuffleManager.shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
+    val is = wrapForCompression(blockId, buf.inputStream())
+    Some(serializer.newInstance().deserializeStream(is).asIterator)
   }
 
   /**
@@ -513,8 +513,9 @@ private[spark] class BlockManager(
     val locations = Random.shuffle(master.getLocations(blockId))
     for (loc <- locations) {
       logDebug(s"Getting remote block $blockId from $loc")
-      val data = BlockManagerWorker.syncGetBlock(
-        GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
+      val data = blockTransferService.fetchBlockSync(
+        loc.host, loc.port, blockId.toString).nioByteBuffer()
+
       if (data != null) {
         if (asBlockResult) {
           return Some(new BlockResult(
@@ -548,22 +549,6 @@ private[spark] class BlockManager(
     None
   }
 
-  /**
-   * Get multiple blocks from local and remote block manager using their 
BlockManagerIds. Returns
-   * an Iterator of (block ID, value) pairs so that clients may handle blocks 
in a pipelined
-   * fashion as they're received. Expects a size in bytes to be provided for 
each block fetched,
-   * so that we can control the maxMegabytesInFlight for the fetch.
-   */
-  def getMultiple(
-      blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
-      serializer: Serializer,
-      readMetrics: ShuffleReadMetrics): BlockFetcherIterator = {
-    val iter = new BlockFetcherIterator.BasicBlockFetcherIterator(this, 
blocksByAddress, serializer,
-      readMetrics)
-    iter.initialize()
-    iter
-  }
-
   def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
@@ -816,12 +801,15 @@ private[spark] class BlockManager(
       data.rewind()
       logDebug(s"Try to replicate $blockId once; The size of the data is 
${data.limit()} Bytes. " +
         s"To node: $peer")
-      val putBlock = PutBlock(blockId, data, tLevel)
-      val cmId = new ConnectionManagerId(peer.host, peer.port)
-      val syncPutBlockSuccess = BlockManagerWorker.syncPutBlock(putBlock, cmId)
-      if (!syncPutBlockSuccess) {
-        logError(s"Failed to call syncPutBlock to $peer")
+
+      try {
+        blockTransferService.uploadBlockSync(
+          peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
+      } catch {
+        case e: Exception =>
+          logError(s"Failed to replicate block to $peer", e)
       }
+
       logDebug("Replicating BlockId %s once used %fs; The size of the data is 
%d bytes."
         .format(blockId, (System.nanoTime - start) / 1e6, data.limit()))
     }
@@ -1051,7 +1039,7 @@ private[spark] class BlockManager(
   }
 
   def stop(): Unit = {
-    connectionManager.stop()
+    blockTransferService.stop()
     diskBlockManager.stop()
     actorSystem.stop(slaveActor)
     blockInfo.clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index b7bcb2d..d4487fc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -36,8 +36,8 @@ import org.apache.spark.util.Utils
 class BlockManagerId private (
     private var executorId_ : String,
     private var host_ : String,
-    private var port_ : Int
-  ) extends Externalizable {
+    private var port_ : Int)
+  extends Externalizable {
 
   private def this() = this(null, null, 0)  // For deserialization only
 

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
deleted file mode 100644
index bf002a4..0000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.Logging
-import org.apache.spark.network._
-import org.apache.spark.util.Utils
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import scala.util.{Try, Failure, Success}
-
-/**
- * A network interface for BlockManager. Each slave should have one
- * BlockManagerWorker.
- *
- * TODO: Use event model.
- */
-private[spark] class BlockManagerWorker(val blockManager: BlockManager) 
extends Logging {
-
-  blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)
-
-  def onBlockMessageReceive(msg: Message, id: ConnectionManagerId): 
Option[Message] = {
-    logDebug("Handling message " + msg)
-    msg match {
-      case bufferMessage: BufferMessage => {
-        try {
-          logDebug("Handling as a buffer message " + bufferMessage)
-          val blockMessages = 
BlockMessageArray.fromBufferMessage(bufferMessage)
-          logDebug("Parsed as a block message array")
-          val responseMessages = 
blockMessages.map(processBlockMessage).filter(_ != None).map(_.get)
-          Some(new BlockMessageArray(responseMessages).toBufferMessage)
-        } catch {
-          case e: Exception => {
-            logError("Exception handling buffer message", e)
-            val errorMessage = Message.createBufferMessage(msg.id)
-            errorMessage.hasError = true
-            Some(errorMessage)
-          }
-        }
-      }
-      case otherMessage: Any => {
-        logError("Unknown type message received: " + otherMessage)
-        val errorMessage = Message.createBufferMessage(msg.id)
-        errorMessage.hasError = true
-        Some(errorMessage)
-      }
-    }
-  }
-
-  def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
-    blockMessage.getType match {
-      case BlockMessage.TYPE_PUT_BLOCK => {
-        val pB = PutBlock(blockMessage.getId, blockMessage.getData, 
blockMessage.getLevel)
-        logDebug("Received [" + pB + "]")
-        putBlock(pB.id, pB.data, pB.level)
-        None
-      }
-      case BlockMessage.TYPE_GET_BLOCK => {
-        val gB = new GetBlock(blockMessage.getId)
-        logDebug("Received [" + gB + "]")
-        val buffer = getBlock(gB.id)
-        if (buffer == null) {
-          return None
-        }
-        Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
-      }
-      case _ => None
-    }
-  }
-
-  private def putBlock(id: BlockId, bytes: ByteBuffer, level: StorageLevel) {
-    val startTimeMs = System.currentTimeMillis()
-    logDebug("PutBlock " + id + " started from " + startTimeMs + " with data: 
" + bytes)
-    blockManager.putBytes(id, bytes, level)
-    logDebug("PutBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs)
-        + " with data size: " + bytes.limit)
-  }
-
-  private def getBlock(id: BlockId): ByteBuffer = {
-    val startTimeMs = System.currentTimeMillis()
-    logDebug("GetBlock " + id + " started from " + startTimeMs)
-    val buffer = blockManager.getLocalBytes(id) match {
-      case Some(bytes) => bytes
-      case None => null
-    }
-    logDebug("GetBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs)
-        + " and got buffer " + buffer)
-    buffer
-  }
-}
-
-private[spark] object BlockManagerWorker extends Logging {
-  private var blockManagerWorker: BlockManagerWorker = null
-
-  def startBlockManagerWorker(manager: BlockManager) {
-    blockManagerWorker = new BlockManagerWorker(manager)
-  }
-
-  def syncPutBlock(msg: PutBlock, toConnManagerId: ConnectionManagerId): 
Boolean = {
-    val blockManager = blockManagerWorker.blockManager
-    val connectionManager = blockManager.connectionManager
-    val blockMessage = BlockMessage.fromPutBlock(msg)
-    val blockMessageArray = new BlockMessageArray(blockMessage)
-    val resultMessage = Try(Await.result(connectionManager.sendMessageReliably(
-        toConnManagerId, blockMessageArray.toBufferMessage), Duration.Inf))
-    resultMessage.isSuccess
-  }
-
-  def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): 
ByteBuffer = {
-    val blockManager = blockManagerWorker.blockManager
-    val connectionManager = blockManager.connectionManager
-    val blockMessage = BlockMessage.fromGetBlock(msg)
-    val blockMessageArray = new BlockMessageArray(blockMessage)
-    val responseMessage = 
Try(Await.result(connectionManager.sendMessageReliably(
-        toConnManagerId, blockMessageArray.toBufferMessage), Duration.Inf))
-    responseMessage match {
-      case Success(message) => {
-        val bufferMessage = message.asInstanceOf[BufferMessage]
-        logDebug("Response message received " + bufferMessage)
-        
BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => {
-            logDebug("Found " + blockMessage)
-            return blockMessage.getData
-          })
-      }
-      case Failure(exception) => logDebug("No response message received")
-    }
-    null
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
deleted file mode 100644
index a2bfce7..0000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.StringBuilder
-
-import org.apache.spark.network._
-
-private[spark] case class GetBlock(id: BlockId)
-private[spark] case class GotBlock(id: BlockId, data: ByteBuffer)
-private[spark] case class PutBlock(id: BlockId, data: ByteBuffer, level: 
StorageLevel)
-
-private[spark] class BlockMessage() {
-  // Un-initialized: typ = 0
-  // GetBlock: typ = 1
-  // GotBlock: typ = 2
-  // PutBlock: typ = 3
-  private var typ: Int = BlockMessage.TYPE_NON_INITIALIZED
-  private var id: BlockId = null
-  private var data: ByteBuffer = null
-  private var level: StorageLevel = null
-
-  def set(getBlock: GetBlock) {
-    typ = BlockMessage.TYPE_GET_BLOCK
-    id = getBlock.id
-  }
-
-  def set(gotBlock: GotBlock) {
-    typ = BlockMessage.TYPE_GOT_BLOCK
-    id = gotBlock.id
-    data = gotBlock.data
-  }
-
-  def set(putBlock: PutBlock) {
-    typ = BlockMessage.TYPE_PUT_BLOCK
-    id = putBlock.id
-    data = putBlock.data
-    level = putBlock.level
-  }
-
-  def set(buffer: ByteBuffer) {
-    /*
-    println()
-    println("BlockMessage: ")
-    while(buffer.remaining > 0) {
-      print(buffer.get())
-    }
-    buffer.rewind()
-    println()
-    println()
-    */
-    typ = buffer.getInt()
-    val idLength = buffer.getInt()
-    val idBuilder = new StringBuilder(idLength)
-    for (i <- 1 to idLength) {
-      idBuilder += buffer.getChar()
-    }
-    id = BlockId(idBuilder.toString)
-
-    if (typ == BlockMessage.TYPE_PUT_BLOCK) {
-
-      val booleanInt = buffer.getInt()
-      val replication = buffer.getInt()
-      level = StorageLevel(booleanInt, replication)
-
-      val dataLength = buffer.getInt()
-      data = ByteBuffer.allocate(dataLength)
-      if (dataLength != buffer.remaining) {
-        throw new Exception("Error parsing buffer")
-      }
-      data.put(buffer)
-      data.flip()
-    } else if (typ == BlockMessage.TYPE_GOT_BLOCK) {
-
-      val dataLength = buffer.getInt()
-      data = ByteBuffer.allocate(dataLength)
-      if (dataLength != buffer.remaining) {
-        throw new Exception("Error parsing buffer")
-      }
-      data.put(buffer)
-      data.flip()
-    }
-
-  }
-
-  def set(bufferMsg: BufferMessage) {
-    val buffer = bufferMsg.buffers.apply(0)
-    buffer.clear()
-    set(buffer)
-  }
-
-  def getType: Int = typ
-  def getId: BlockId = id
-  def getData: ByteBuffer = data
-  def getLevel: StorageLevel =  level
-
-  def toBufferMessage: BufferMessage = {
-    val buffers = new ArrayBuffer[ByteBuffer]()
-    var buffer = ByteBuffer.allocate(4 + 4 + id.name.length * 2)
-    buffer.putInt(typ).putInt(id.name.length)
-    id.name.foreach((x: Char) => buffer.putChar(x))
-    buffer.flip()
-    buffers += buffer
-
-    if (typ == BlockMessage.TYPE_PUT_BLOCK) {
-      buffer = 
ByteBuffer.allocate(8).putInt(level.toInt).putInt(level.replication)
-      buffer.flip()
-      buffers += buffer
-
-      buffer = ByteBuffer.allocate(4).putInt(data.remaining)
-      buffer.flip()
-      buffers += buffer
-
-      buffers += data
-    } else if (typ == BlockMessage.TYPE_GOT_BLOCK) {
-      buffer = ByteBuffer.allocate(4).putInt(data.remaining)
-      buffer.flip()
-      buffers += buffer
-
-      buffers += data
-    }
-
-    /*
-    println()
-    println("BlockMessage: ")
-    buffers.foreach(b => {
-      while(b.remaining > 0) {
-        print(b.get())
-      }
-      b.rewind()
-    })
-    println()
-    println()
-    */
-    Message.createBufferMessage(buffers)
-  }
-
-  override def toString: String = {
-    "BlockMessage [type = " + typ + ", id = " + id + ", level = " + level +
-    ", data = " + (if (data != null) data.remaining.toString  else "null") + 
"]"
-  }
-}
-
-private[spark] object BlockMessage {
-  val TYPE_NON_INITIALIZED: Int = 0
-  val TYPE_GET_BLOCK: Int = 1
-  val TYPE_GOT_BLOCK: Int = 2
-  val TYPE_PUT_BLOCK: Int = 3
-
-  def fromBufferMessage(bufferMessage: BufferMessage): BlockMessage = {
-    val newBlockMessage = new BlockMessage()
-    newBlockMessage.set(bufferMessage)
-    newBlockMessage
-  }
-
-  def fromByteBuffer(buffer: ByteBuffer): BlockMessage = {
-    val newBlockMessage = new BlockMessage()
-    newBlockMessage.set(buffer)
-    newBlockMessage
-  }
-
-  def fromGetBlock(getBlock: GetBlock): BlockMessage = {
-    val newBlockMessage = new BlockMessage()
-    newBlockMessage.set(getBlock)
-    newBlockMessage
-  }
-
-  def fromGotBlock(gotBlock: GotBlock): BlockMessage = {
-    val newBlockMessage = new BlockMessage()
-    newBlockMessage.set(gotBlock)
-    newBlockMessage
-  }
-
-  def fromPutBlock(putBlock: PutBlock): BlockMessage = {
-    val newBlockMessage = new BlockMessage()
-    newBlockMessage.set(putBlock)
-    newBlockMessage
-  }
-
-  def main(args: Array[String]) {
-    val B = new BlockMessage()
-    val blockId = TestBlockId("ABC")
-    B.set(new PutBlock(blockId, ByteBuffer.allocate(10), 
StorageLevel.MEMORY_AND_DISK_SER_2))
-    val bMsg = B.toBufferMessage
-    val C = new BlockMessage()
-    C.set(bMsg)
-
-    println(B.getId + " " + B.getLevel)
-    println(C.getId + " " + C.getLevel)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
deleted file mode 100644
index 973d85c..0000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark._
-import org.apache.spark.network._
-
-private[spark]
-class BlockMessageArray(var blockMessages: Seq[BlockMessage])
-  extends Seq[BlockMessage] with Logging {
-
-  def this(bm: BlockMessage) = this(Array(bm))
-
-  def this() = this(null.asInstanceOf[Seq[BlockMessage]])
-
-  def apply(i: Int) = blockMessages(i)
-
-  def iterator = blockMessages.iterator
-
-  def length = blockMessages.length
-
-  def set(bufferMessage: BufferMessage) {
-    val startTime = System.currentTimeMillis
-    val newBlockMessages = new ArrayBuffer[BlockMessage]()
-    val buffer = bufferMessage.buffers(0)
-    buffer.clear()
-    /*
-    println()
-    println("BlockMessageArray: ")
-    while(buffer.remaining > 0) {
-      print(buffer.get())
-    }
-    buffer.rewind()
-    println()
-    println()
-    */
-    while (buffer.remaining() > 0) {
-      val size = buffer.getInt()
-      logDebug("Creating block message of size " + size + " bytes")
-      val newBuffer = buffer.slice()
-      newBuffer.clear()
-      newBuffer.limit(size)
-      logDebug("Trying to convert buffer " + newBuffer + " to block message")
-      val newBlockMessage = BlockMessage.fromByteBuffer(newBuffer)
-      logDebug("Created " + newBlockMessage)
-      newBlockMessages += newBlockMessage
-      buffer.position(buffer.position() + size)
-    }
-    val finishTime = System.currentTimeMillis
-    logDebug("Converted block message array from buffer message in " +
-      (finishTime - startTime) / 1000.0  + " s")
-    this.blockMessages = newBlockMessages
-  }
-
-  def toBufferMessage: BufferMessage = {
-    val buffers = new ArrayBuffer[ByteBuffer]()
-
-    blockMessages.foreach(blockMessage => {
-      val bufferMessage = blockMessage.toBufferMessage
-      logDebug("Adding " + blockMessage)
-      val sizeBuffer = ByteBuffer.allocate(4).putInt(bufferMessage.size)
-      sizeBuffer.flip
-      buffers += sizeBuffer
-      buffers ++= bufferMessage.buffers
-      logDebug("Added " + bufferMessage)
-    })
-
-    logDebug("Buffer list:")
-    buffers.foreach((x: ByteBuffer) => logDebug("" + x))
-    /*
-    println()
-    println("BlockMessageArray: ")
-    buffers.foreach(b => {
-      while(b.remaining > 0) {
-        print(b.get())
-      }
-      b.rewind()
-    })
-    println()
-    println()
-    */
-    Message.createBufferMessage(buffers)
-  }
-}
-
-private[spark] object BlockMessageArray {
-
-  def fromBufferMessage(bufferMessage: BufferMessage): BlockMessageArray = {
-    val newBlockMessageArray = new BlockMessageArray()
-    newBlockMessageArray.set(bufferMessage)
-    newBlockMessageArray
-  }
-
-  def main(args: Array[String]) {
-    val blockMessages =
-      (0 until 10).map { i =>
-        if (i % 2 == 0) {
-          val buffer =  ByteBuffer.allocate(100)
-          buffer.clear
-          BlockMessage.fromPutBlock(PutBlock(TestBlockId(i.toString), buffer,
-            StorageLevel.MEMORY_ONLY_SER))
-        } else {
-          BlockMessage.fromGetBlock(GetBlock(TestBlockId(i.toString)))
-        }
-      }
-    val blockMessageArray = new BlockMessageArray(blockMessages)
-    println("Block message array created")
-
-    val bufferMessage = blockMessageArray.toBufferMessage
-    println("Converted to buffer message")
-
-    val totalSize = bufferMessage.size
-    val newBuffer = ByteBuffer.allocate(totalSize)
-    newBuffer.clear()
-    bufferMessage.buffers.foreach(buffer => {
-      assert (0 == buffer.position())
-      newBuffer.put(buffer)
-      buffer.rewind()
-    })
-    newBuffer.flip
-    val newBufferMessage = Message.createBufferMessage(newBuffer)
-    println("Copied to new buffer message, size = " + newBufferMessage.size)
-
-    val newBlockMessageArray = 
BlockMessageArray.fromBufferMessage(newBufferMessage)
-    println("Converted back to block message array")
-    newBlockMessageArray.foreach(blockMessage => {
-      blockMessage.getType match {
-        case BlockMessage.TYPE_PUT_BLOCK => {
-          val pB = PutBlock(blockMessage.getId, blockMessage.getData, 
blockMessage.getLevel)
-          println(pB)
-        }
-        case BlockMessage.TYPE_GET_BLOCK => {
-          val gB = new GetBlock(blockMessage.getId)
-          println(gB)
-        }
-      }
-    })
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
new file mode 100644
index 0000000..c8e708a
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashSet
+import scala.collection.mutable.Queue
+
+import org.apache.spark.{TaskContext, Logging, SparkException}
+import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener, 
BlockTransferService}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.util.Utils
+
+
+/**
+ * An iterator that fetches multiple blocks. For local blocks, it fetches from 
the local block
+ * manager. For remote blocks, it fetches them using the provided 
BlockTransferService.
+ *
+ * This creates an iterator of (BlockID, values) tuples so the caller can 
handle blocks in a
+ * pipelined fashion as they are received.
+ *
+ * The implementation throttles the remote fetches to they don't exceed 
maxBytesInFlight to avoid
+ * using too much memory.
+ *
+ * @param context [[TaskContext]], used for metrics update
+ * @param blockTransferService [[BlockTransferService]] for fetching remote 
blocks
+ * @param blockManager  [[BlockManager]] for reading local blocks
+ * @param blocksByAddress list of blocks to fetch grouped by the 
[[BlockManagerId]].
+ *                        For each block we also require the size (in bytes as 
a long field) in
+ *                        order to throttle the memory usage.
+ * @param serializer serializer used to deserialize the data.
+ * @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at 
any given point.
+ */
+private[spark]
+final class ShuffleBlockFetcherIterator(
+    context: TaskContext,
+    blockTransferService: BlockTransferService,
+    blockManager: BlockManager,
+    blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
+    serializer: Serializer,
+    maxBytesInFlight: Long)
+  extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
+
+  import ShuffleBlockFetcherIterator._
+
+  /**
+   * Total number of blocks to fetch. This can be smaller than the total 
number of blocks
+   * in [[blocksByAddress]] because we filter out zero-sized blocks in 
[[initialize]].
+   *
+   * This should equal localBlocks.size + remoteBlocks.size.
+   */
+  private[this] var numBlocksToFetch = 0
+
+  /**
+   * The number of blocks proccessed by the caller. The iterator is exhausted 
when
+   * [[numBlocksProcessed]] == [[numBlocksToFetch]].
+   */
+  private[this] var numBlocksProcessed = 0
+
+  private[this] val startTime = System.currentTimeMillis
+
+  /** Local blocks to fetch, excluding zero-sized blocks. */
+  private[this] val localBlocks = new ArrayBuffer[BlockId]()
+
+  /** Remote blocks to fetch, excluding zero-sized blocks. */
+  private[this] val remoteBlocks = new HashSet[BlockId]()
+
+  /**
+   * A queue to hold our results. This turns the asynchronous model provided by
+   * [[BlockTransferService]] into a synchronous model (iterator).
+   */
+  private[this] val results = new LinkedBlockingQueue[FetchResult]
+
+  // Queue of fetch requests to issue; we'll pull requests off this gradually 
to make sure that
+  // the number of bytes in flight is limited to maxBytesInFlight
+  private[this] val fetchRequests = new Queue[FetchRequest]
+
+  // Current bytes in flight from our requests
+  private[this] var bytesInFlight = 0L
+
+  private[this] val shuffleMetrics = 
context.taskMetrics.createShuffleReadMetricsForDependency()
+
+  initialize()
+
+  private[this] def sendRequest(req: FetchRequest) {
+    logDebug("Sending request for %d blocks (%s) from %s".format(
+      req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
+    bytesInFlight += req.size
+
+    // so we can look up the size of each blockID
+    val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, 
size) }.toMap
+    val blockIds = req.blocks.map(_._1.toString)
+
+    blockTransferService.fetchBlocks(req.address.host, req.address.port, 
blockIds,
+      new BlockFetchingListener {
+        override def onBlockFetchSuccess(blockId: String, data: 
ManagedBuffer): Unit = {
+          results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
+            () => serializer.newInstance().deserializeStream(
+              blockManager.wrapForCompression(BlockId(blockId), 
data.inputStream())).asIterator
+          ))
+          shuffleMetrics.remoteBytesRead += data.size
+          shuffleMetrics.remoteBlocksFetched += 1
+          logDebug("Got remote block " + blockId + " after " + 
Utils.getUsedTimeMs(startTime))
+        }
+
+        override def onBlockFetchFailure(e: Throwable): Unit = {
+          logError("Failed to get block(s) from 
${req.address.host}:${req.address.port}", e)
+          // Note that there is a chance that some blocks have been fetched 
successfully, but we
+          // still add them to the failed queue. This is fine because when the 
caller see a
+          // FetchFailedException, it is going to fail the entire task anyway.
+          for ((blockId, size) <- req.blocks) {
+            results.put(new FetchResult(blockId, -1, null))
+          }
+        }
+      }
+    )
+  }
+
+  private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
+    // Make remote requests at most maxBytesInFlight / 5 in length; the reason 
to keep them
+    // smaller than maxBytesInFlight is to allow multiple, parallel fetches 
from up to 5
+    // nodes, rather than blocking on reading output from one node.
+    val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
+    logInfo("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " 
+ targetRequestSize)
+
+    // Split local and remote blocks. Remote blocks are further split into 
FetchRequests of size
+    // at most maxBytesInFlight in order to limit the amount of data in flight.
+    val remoteRequests = new ArrayBuffer[FetchRequest]
+
+    // Tracks total number of blocks (including zero sized blocks)
+    var totalBlocks = 0
+    for ((address, blockInfos) <- blocksByAddress) {
+      totalBlocks += blockInfos.size
+      if (address == blockManager.blockManagerId) {
+        // Filter out zero-sized blocks
+        localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
+        numBlocksToFetch += localBlocks.size
+      } else {
+        val iterator = blockInfos.iterator
+        var curRequestSize = 0L
+        var curBlocks = new ArrayBuffer[(BlockId, Long)]
+        while (iterator.hasNext) {
+          val (blockId, size) = iterator.next()
+          // Skip empty blocks
+          if (size > 0) {
+            curBlocks += ((blockId, size))
+            remoteBlocks += blockId
+            numBlocksToFetch += 1
+            curRequestSize += size
+          } else if (size < 0) {
+            throw new BlockException(blockId, "Negative block size " + size)
+          }
+          if (curRequestSize >= targetRequestSize) {
+            // Add this FetchRequest
+            remoteRequests += new FetchRequest(address, curBlocks)
+            curBlocks = new ArrayBuffer[(BlockId, Long)]
+            logDebug(s"Creating fetch request of $curRequestSize at $address")
+            curRequestSize = 0
+          }
+        }
+        // Add in the final request
+        if (curBlocks.nonEmpty) {
+          remoteRequests += new FetchRequest(address, curBlocks)
+        }
+      }
+    }
+    logInfo(s"Getting $numBlocksToFetch non-empty blocks out of $totalBlocks 
blocks")
+    remoteRequests
+  }
+
+  private[this] def fetchLocalBlocks() {
+    // Get the local blocks while remote blocks are being fetched. Note that 
it's okay to do
+    // these all at once because they will just memory-map some files, so they 
won't consume
+    // any memory that might exceed our maxBytesInFlight
+    for (id <- localBlocks) {
+      try {
+        shuffleMetrics.localBlocksFetched += 1
+        results.put(new FetchResult(
+          id, 0, () => blockManager.getLocalShuffleFromDisk(id, 
serializer).get))
+        logDebug("Got local block " + id)
+      } catch {
+        case e: Exception =>
+          logError(s"Error occurred while fetching local blocks", e)
+          results.put(new FetchResult(id, -1, null))
+          return
+      }
+    }
+  }
+
+  private[this] def initialize(): Unit = {
+    // Split local and remote blocks.
+    val remoteRequests = splitLocalRemoteBlocks()
+    // Add the remote requests into our queue in a random order
+    fetchRequests ++= Utils.randomize(remoteRequests)
+
+    // Send out initial requests for blocks, up to our maxBytesInFlight
+    while (fetchRequests.nonEmpty &&
+      (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= 
maxBytesInFlight)) {
+      sendRequest(fetchRequests.dequeue())
+    }
+
+    val numFetches = remoteRequests.size - fetchRequests.size
+    logInfo("Started " + numFetches + " remote fetches in" + 
Utils.getUsedTimeMs(startTime))
+
+    // Get Local Blocks
+    fetchLocalBlocks()
+    logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
+  }
+
+  override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch
+
+  override def next(): (BlockId, Option[Iterator[Any]]) = {
+    numBlocksProcessed += 1
+    val startFetchWait = System.currentTimeMillis()
+    val result = results.take()
+    val stopFetchWait = System.currentTimeMillis()
+    shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait)
+    if (!result.failed) {
+      bytesInFlight -= result.size
+    }
+    // Send fetch requests up to maxBytesInFlight
+    while (fetchRequests.nonEmpty &&
+      (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= 
maxBytesInFlight)) {
+      sendRequest(fetchRequests.dequeue())
+    }
+    (result.blockId, if (result.failed) None else Some(result.deserialize()))
+  }
+}
+
+
+private[storage]
+object ShuffleBlockFetcherIterator {
+
+  /**
+   * A request to fetch blocks from a remote BlockManager.
+   * @param address remote BlockManager to fetch from.
+   * @param blocks Sequence of tuple, where the first element is the block id,
+   *               and the second element is the estimated size, used to 
calculate bytesInFlight.
+   */
+  class FetchRequest(val address: BlockManagerId, val blocks: Seq[(BlockId, 
Long)]) {
+    val size = blocks.map(_._2).sum
+  }
+
+  /**
+   * Result of a fetch from a remote block. A failure is represented as size 
== -1.
+   * @param blockId block id
+   * @param size estimated size of the block, used to calculate bytesInFlight.
+   *             Note that this is NOT the exact bytes.
+   * @param deserialize closure to return the result in the form of an 
Iterator.
+   */
+  class FetchResult(val blockId: BlockId, val size: Long, val deserialize: () 
=> Iterator[Any]) {
+    def failed: Boolean = size == -1
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala 
b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
deleted file mode 100644
index 7540f0d..0000000
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.util.concurrent.ArrayBlockingQueue
-
-import akka.actor._
-import org.apache.spark.shuffle.hash.HashShuffleManager
-import util.Random
-
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
-import org.apache.spark.scheduler.LiveListenerBus
-import org.apache.spark.serializer.KryoSerializer
-
-/**
- * This class tests the BlockManager and MemoryStore for thread safety and
- * deadlocks. It spawns a number of producer and consumer threads. Producer
- * threads continuously pushes blocks into the BlockManager and consumer
- * threads continuously retrieves the blocks form the BlockManager and tests
- * whether the block is correct or not.
- */
-private[spark] object ThreadingTest {
-
-  val numProducers = 5
-  val numBlocksPerProducer = 20000
-
-  private[spark] class ProducerThread(manager: BlockManager, id: Int) extends 
Thread {
-    val queue = new ArrayBlockingQueue[(BlockId, Seq[Int])](100)
-
-    override def run() {
-      for (i <- 1 to numBlocksPerProducer) {
-        val blockId = TestBlockId("b-" + id + "-" + i)
-        val blockSize = Random.nextInt(1000)
-        val block = (1 to blockSize).map(_ => Random.nextInt())
-        val level = randomLevel()
-        val startTime = System.currentTimeMillis()
-        manager.putIterator(blockId, block.iterator, level, tellMaster = true)
-        println("Pushed block " + blockId + " in " + (System.currentTimeMillis 
- startTime) + " ms")
-        queue.add((blockId, block))
-      }
-      println("Producer thread " + id + " terminated")
-    }
-
-    def randomLevel(): StorageLevel = {
-      math.abs(Random.nextInt()) % 4 match {
-        case 0 => StorageLevel.MEMORY_ONLY
-        case 1 => StorageLevel.MEMORY_ONLY_SER
-        case 2 => StorageLevel.MEMORY_AND_DISK
-        case 3 => StorageLevel.MEMORY_AND_DISK_SER
-      }
-    }
-  }
-
-  private[spark] class ConsumerThread(
-      manager: BlockManager,
-      queue: ArrayBlockingQueue[(BlockId, Seq[Int])]
-    ) extends Thread {
-    var numBlockConsumed = 0
-
-    override def run() {
-      println("Consumer thread started")
-      while(numBlockConsumed < numBlocksPerProducer) {
-        val (blockId, block) = queue.take()
-        val startTime = System.currentTimeMillis()
-        manager.get(blockId) match {
-          case Some(retrievedBlock) =>
-            assert(retrievedBlock.data.toList.asInstanceOf[List[Int]] == 
block.toList,
-              "Block " + blockId + " did not match")
-            println("Got block " + blockId + " in " +
-              (System.currentTimeMillis - startTime) + " ms")
-          case None =>
-            assert(false, "Block " + blockId + " could not be retrieved")
-        }
-        numBlockConsumed += 1
-      }
-      println("Consumer thread terminated")
-    }
-  }
-
-  def main(args: Array[String]) {
-    System.setProperty("spark.kryoserializer.buffer.mb", "1")
-    val actorSystem = ActorSystem("test")
-    val conf = new SparkConf()
-    val serializer = new KryoSerializer(conf)
-    val blockManagerMaster = new BlockManagerMaster(
-      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new 
LiveListenerBus))),
-      conf, true)
-    val blockManager = new BlockManager(
-      "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, 
conf,
-      new SecurityManager(conf), new MapOutputTrackerMaster(conf), new 
HashShuffleManager(conf))
-    val producers = (1 to numProducers).map(i => new 
ProducerThread(blockManager, i))
-    val consumers = producers.map(p => new ConsumerThread(blockManager, 
p.queue))
-    producers.foreach(_.start)
-    consumers.foreach(_.start)
-    producers.foreach(_.join)
-    consumers.foreach(_.join)
-    blockManager.stop()
-    blockManagerMaster.stop()
-    actorSystem.shutdown()
-    actorSystem.awaitTermination()
-    println("Everything stopped.")
-    println(
-      "It will take sometime for the JVM to clean all temporary files and 
shutdown. Sit tight.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 41c294f..81b64c3 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -24,8 +24,7 @@ import org.scalatest.Matchers
 import org.scalatest.time.{Millis, Span}
 
 import org.apache.spark.SparkContext._
-import org.apache.spark.network.ConnectionManagerId
-import org.apache.spark.storage.{BlockManagerWorker, GetBlock, RDDBlockId, 
StorageLevel}
+import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 
 class NotSerializableClass
 class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() 
{}
@@ -136,7 +135,6 @@ class DistributedSuite extends FunSuite with Matchers with 
BeforeAndAfter
         sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) }
       }
       assert(thrown.getClass === classOf[SparkException])
-      System.out.println(thrown.getMessage)
       assert(thrown.getMessage.contains("failed 4 times"))
     }
   }
@@ -202,12 +200,13 @@ class DistributedSuite extends FunSuite with Matchers 
with BeforeAndAfter
     val blockIds = data.partitions.indices.map(index => RDDBlockId(data.id, 
index)).toArray
     val blockId = blockIds(0)
     val blockManager = SparkEnv.get.blockManager
-    blockManager.master.getLocations(blockId).foreach(id => {
-      val bytes = BlockManagerWorker.syncGetBlock(
-        GetBlock(blockId), ConnectionManagerId(id.host, id.port))
-      val deserialized = blockManager.dataDeserialize(blockId, 
bytes).asInstanceOf[Iterator[Int]].toList
+    val blockTransfer = SparkEnv.get.blockTransferService
+    blockManager.master.getLocations(blockId).foreach { cmId =>
+      val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, 
blockId.toString)
+      val deserialized = blockManager.dataDeserialize(blockId, 
bytes.nioByteBuffer())
+        .asInstanceOf[Iterator[Int]].toList
       assert(deserialized === (1 to 100).toList)
-    })
+    }
   }
 
   test("compute without caching when no partitions fit in memory") {

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala
deleted file mode 100644
index e2f4d4c..0000000
--- a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network
-
-import java.io.IOException
-import java.nio._
-import java.util.concurrent.TimeoutException
-
-import org.apache.spark.{SecurityManager, SparkConf}
-import org.scalatest.FunSuite
-
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-
-import scala.concurrent.TimeoutException
-import scala.concurrent.{Await, TimeoutException}
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.{Failure, Success, Try}
-
-/**
-  * Test the ConnectionManager with various security settings.
-  */
-class ConnectionManagerSuite extends FunSuite {
-
-  test("security default off") {
-    val conf = new SparkConf
-    val securityManager = new SecurityManager(conf)
-    val manager = new ConnectionManager(0, conf, securityManager)
-    var receivedMessage = false
-    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
-      receivedMessage = true
-      None
-    })
-
-    val size = 10 * 1024 * 1024
-    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
-    buffer.flip
-
-    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
-    Await.result(manager.sendMessageReliably(manager.id, bufferMessage), 10 
seconds)
-
-    assert(receivedMessage == true)
-
-    manager.stop()
-  }
-
-  test("security on same password") {
-    val conf = new SparkConf
-    conf.set("spark.authenticate", "true")
-    conf.set("spark.authenticate.secret", "good")
-    val securityManager = new SecurityManager(conf)
-    val manager = new ConnectionManager(0, conf, securityManager)
-    var numReceivedMessages = 0
-
-    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
-      numReceivedMessages += 1
-      None
-    })
-    val managerServer = new ConnectionManager(0, conf, securityManager)
-    var numReceivedServerMessages = 0
-    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
-      numReceivedServerMessages += 1
-      None
-    })
-
-    val size = 10 * 1024 * 1024
-    val count = 10
-    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
-    buffer.flip
-
-    (0 until count).map(i => {
-      val bufferMessage = Message.createBufferMessage(buffer.duplicate)
-      Await.result(manager.sendMessageReliably(managerServer.id, 
bufferMessage), 10 seconds)
-    })
-
-    assert(numReceivedServerMessages == 10)
-    assert(numReceivedMessages == 0)
-
-    manager.stop()
-    managerServer.stop()
-  }
-
-  test("security mismatch password") {
-    val conf = new SparkConf
-    conf.set("spark.authenticate", "true")
-    conf.set("spark.authenticate.secret", "good")
-    val securityManager = new SecurityManager(conf)
-    val manager = new ConnectionManager(0, conf, securityManager)
-    var numReceivedMessages = 0
-
-    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
-      numReceivedMessages += 1
-      None
-    })
-
-    val badconf = new SparkConf
-    badconf.set("spark.authenticate", "true")
-    badconf.set("spark.authenticate.secret", "bad")
-    val badsecurityManager = new SecurityManager(badconf)
-    val managerServer = new ConnectionManager(0, badconf, badsecurityManager)
-    var numReceivedServerMessages = 0
-
-    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
-      numReceivedServerMessages += 1
-      None
-    })
-
-    val size = 10 * 1024 * 1024
-    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
-    buffer.flip
-    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
-    // Expect managerServer to close connection, which we'll report as an 
error:
-    intercept[IOException] {
-      Await.result(manager.sendMessageReliably(managerServer.id, 
bufferMessage), 10 seconds)
-    }
-
-    assert(numReceivedServerMessages == 0)
-    assert(numReceivedMessages == 0)
-
-    manager.stop()
-    managerServer.stop()
-  }
-
-  test("security mismatch auth off") {
-    val conf = new SparkConf
-    conf.set("spark.authenticate", "false")
-    conf.set("spark.authenticate.secret", "good")
-    val securityManager = new SecurityManager(conf)
-    val manager = new ConnectionManager(0, conf, securityManager)
-    var numReceivedMessages = 0
-
-    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
-      numReceivedMessages += 1
-      None
-    })
-
-    val badconf = new SparkConf
-    badconf.set("spark.authenticate", "true")
-    badconf.set("spark.authenticate.secret", "good")
-    val badsecurityManager = new SecurityManager(badconf)
-    val managerServer = new ConnectionManager(0, badconf, badsecurityManager)
-    var numReceivedServerMessages = 0
-    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
-      numReceivedServerMessages += 1
-      None
-    })
-
-    val size = 10 * 1024 * 1024
-    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
-    buffer.flip
-    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
-    (0 until 1).map(i => {
-      val bufferMessage = Message.createBufferMessage(buffer.duplicate)
-      manager.sendMessageReliably(managerServer.id, bufferMessage)
-    }).foreach(f => {
-      try {
-        val g = Await.result(f, 1 second)
-        assert(false)
-      } catch {
-        case i: IOException =>
-          assert(true)
-        case e: TimeoutException => {
-          // we should timeout here since the client can't do the negotiation
-          assert(true)
-        }
-      }
-    })
-
-    assert(numReceivedServerMessages == 0)
-    assert(numReceivedMessages == 0)
-    manager.stop()
-    managerServer.stop()
-  }
-
-  test("security auth off") {
-    val conf = new SparkConf
-    conf.set("spark.authenticate", "false")
-    val securityManager = new SecurityManager(conf)
-    val manager = new ConnectionManager(0, conf, securityManager)
-    var numReceivedMessages = 0
-
-    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
-      numReceivedMessages += 1
-      None
-    })
-
-    val badconf = new SparkConf
-    badconf.set("spark.authenticate", "false")
-    val badsecurityManager = new SecurityManager(badconf)
-    val managerServer = new ConnectionManager(0, badconf, badsecurityManager)
-    var numReceivedServerMessages = 0
-
-    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
-      numReceivedServerMessages += 1
-      None
-    })
-
-    val size = 10 * 1024 * 1024
-    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
-    buffer.flip
-    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
-    (0 until 10).map(i => {
-      val bufferMessage = Message.createBufferMessage(buffer.duplicate)
-      manager.sendMessageReliably(managerServer.id, bufferMessage)
-    }).foreach(f => {
-      try {
-        val g = Await.result(f, 1 second)
-      } catch {
-        case e: Exception => {
-          assert(false)
-        }
-      }
-    })
-    assert(numReceivedServerMessages == 10)
-    assert(numReceivedMessages == 0)
-
-    manager.stop()
-    managerServer.stop()
-  }
-
-  test("Ack error message") {
-    val conf = new SparkConf
-    conf.set("spark.authenticate", "false")
-    val securityManager = new SecurityManager(conf)
-    val manager = new ConnectionManager(0, conf, securityManager)
-    val managerServer = new ConnectionManager(0, conf, securityManager)
-    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
-      throw new Exception
-    })
-
-    val size = 10 * 1024 * 1024
-    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
-    buffer.flip
-    val bufferMessage = Message.createBufferMessage(buffer)
-
-    val future = manager.sendMessageReliably(managerServer.id, bufferMessage)
-
-    intercept[IOException] {
-      Await.result(future, 1 second)
-    }
-
-    manager.stop()
-    managerServer.stop()
-
-  }
-
-  test("sendMessageReliably timeout") {
-    val clientConf = new SparkConf
-    clientConf.set("spark.authenticate", "false")
-    val ackTimeout = 30
-    clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}")
-
-    val clientSecurityManager = new SecurityManager(clientConf)
-    val manager = new ConnectionManager(0, clientConf, clientSecurityManager)
-
-    val serverConf = new SparkConf
-    serverConf.set("spark.authenticate", "false")
-    val serverSecurityManager = new SecurityManager(serverConf)
-    val managerServer = new ConnectionManager(0, serverConf, 
serverSecurityManager)
-    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
-      // sleep 60 sec > ack timeout for simulating server slow down or hang up
-      Thread.sleep(ackTimeout * 3 * 1000)
-      None
-    })
-
-    val size = 10 * 1024 * 1024
-    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
-    buffer.flip
-    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
-
-    val future = manager.sendMessageReliably(managerServer.id, bufferMessage)
-
-    // Future should throw IOException in 30 sec.
-    // Otherwise TimeoutExcepton is thrown from Await.result.
-    // We expect TimeoutException is not thrown.
-    intercept[IOException] {
-      Await.result(future, (ackTimeout * 2) second)
-    }
-
-    manager.stop()
-    managerServer.stop()
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
new file mode 100644
index 0000000..9f49587
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.nio
+
+import java.io.IOException
+import java.nio._
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, TimeoutException}
+import scala.language.postfixOps
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.{SecurityManager, SparkConf}
+
+/**
+  * Test the ConnectionManager with various security settings.
+  */
+class ConnectionManagerSuite extends FunSuite {
+
+  test("security default off") {
+    val conf = new SparkConf
+    val securityManager = new SecurityManager(conf)
+    val manager = new ConnectionManager(0, conf, securityManager)
+    var receivedMessage = false
+    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      receivedMessage = true
+      None
+    })
+
+    val size = 10 * 1024 * 1024
+    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
+    buffer.flip
+
+    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+    Await.result(manager.sendMessageReliably(manager.id, bufferMessage), 10 
seconds)
+
+    assert(receivedMessage == true)
+
+    manager.stop()
+  }
+
+  test("security on same password") {
+    val conf = new SparkConf
+    conf.set("spark.authenticate", "true")
+    conf.set("spark.authenticate.secret", "good")
+    val securityManager = new SecurityManager(conf)
+    val manager = new ConnectionManager(0, conf, securityManager)
+    var numReceivedMessages = 0
+
+    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedMessages += 1
+      None
+    })
+    val managerServer = new ConnectionManager(0, conf, securityManager)
+    var numReceivedServerMessages = 0
+    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedServerMessages += 1
+      None
+    })
+
+    val size = 10 * 1024 * 1024
+    val count = 10
+    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
+    buffer.flip
+
+    (0 until count).map(i => {
+      val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+      Await.result(manager.sendMessageReliably(managerServer.id, 
bufferMessage), 10 seconds)
+    })
+
+    assert(numReceivedServerMessages == 10)
+    assert(numReceivedMessages == 0)
+
+    manager.stop()
+    managerServer.stop()
+  }
+
+  test("security mismatch password") {
+    val conf = new SparkConf
+    conf.set("spark.authenticate", "true")
+    conf.set("spark.authenticate.secret", "good")
+    val securityManager = new SecurityManager(conf)
+    val manager = new ConnectionManager(0, conf, securityManager)
+    var numReceivedMessages = 0
+
+    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedMessages += 1
+      None
+    })
+
+    val badconf = new SparkConf
+    badconf.set("spark.authenticate", "true")
+    badconf.set("spark.authenticate.secret", "bad")
+    val badsecurityManager = new SecurityManager(badconf)
+    val managerServer = new ConnectionManager(0, badconf, badsecurityManager)
+    var numReceivedServerMessages = 0
+
+    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedServerMessages += 1
+      None
+    })
+
+    val size = 10 * 1024 * 1024
+    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
+    buffer.flip
+    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+    // Expect managerServer to close connection, which we'll report as an 
error:
+    intercept[IOException] {
+      Await.result(manager.sendMessageReliably(managerServer.id, 
bufferMessage), 10 seconds)
+    }
+
+    assert(numReceivedServerMessages == 0)
+    assert(numReceivedMessages == 0)
+
+    manager.stop()
+    managerServer.stop()
+  }
+
+  test("security mismatch auth off") {
+    val conf = new SparkConf
+    conf.set("spark.authenticate", "false")
+    conf.set("spark.authenticate.secret", "good")
+    val securityManager = new SecurityManager(conf)
+    val manager = new ConnectionManager(0, conf, securityManager)
+    var numReceivedMessages = 0
+
+    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedMessages += 1
+      None
+    })
+
+    val badconf = new SparkConf
+    badconf.set("spark.authenticate", "true")
+    badconf.set("spark.authenticate.secret", "good")
+    val badsecurityManager = new SecurityManager(badconf)
+    val managerServer = new ConnectionManager(0, badconf, badsecurityManager)
+    var numReceivedServerMessages = 0
+    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedServerMessages += 1
+      None
+    })
+
+    val size = 10 * 1024 * 1024
+    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
+    buffer.flip
+    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+    (0 until 1).map(i => {
+      val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+      manager.sendMessageReliably(managerServer.id, bufferMessage)
+    }).foreach(f => {
+      try {
+        val g = Await.result(f, 1 second)
+        assert(false)
+      } catch {
+        case i: IOException =>
+          assert(true)
+        case e: TimeoutException => {
+          // we should timeout here since the client can't do the negotiation
+          assert(true)
+        }
+      }
+    })
+
+    assert(numReceivedServerMessages == 0)
+    assert(numReceivedMessages == 0)
+    manager.stop()
+    managerServer.stop()
+  }
+
+  test("security auth off") {
+    val conf = new SparkConf
+    conf.set("spark.authenticate", "false")
+    val securityManager = new SecurityManager(conf)
+    val manager = new ConnectionManager(0, conf, securityManager)
+    var numReceivedMessages = 0
+
+    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedMessages += 1
+      None
+    })
+
+    val badconf = new SparkConf
+    badconf.set("spark.authenticate", "false")
+    val badsecurityManager = new SecurityManager(badconf)
+    val managerServer = new ConnectionManager(0, badconf, badsecurityManager)
+    var numReceivedServerMessages = 0
+
+    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedServerMessages += 1
+      None
+    })
+
+    val size = 10 * 1024 * 1024
+    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
+    buffer.flip
+    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+    (0 until 10).map(i => {
+      val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+      manager.sendMessageReliably(managerServer.id, bufferMessage)
+    }).foreach(f => {
+      try {
+        val g = Await.result(f, 1 second)
+      } catch {
+        case e: Exception => {
+          assert(false)
+        }
+      }
+    })
+    assert(numReceivedServerMessages == 10)
+    assert(numReceivedMessages == 0)
+
+    manager.stop()
+    managerServer.stop()
+  }
+
+  test("Ack error message") {
+    val conf = new SparkConf
+    conf.set("spark.authenticate", "false")
+    val securityManager = new SecurityManager(conf)
+    val manager = new ConnectionManager(0, conf, securityManager)
+    val managerServer = new ConnectionManager(0, conf, securityManager)
+    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      throw new Exception
+    })
+
+    val size = 10 * 1024 * 1024
+    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
+    buffer.flip
+    val bufferMessage = Message.createBufferMessage(buffer)
+
+    val future = manager.sendMessageReliably(managerServer.id, bufferMessage)
+
+    intercept[IOException] {
+      Await.result(future, 1 second)
+    }
+
+    manager.stop()
+    managerServer.stop()
+
+  }
+
+  test("sendMessageReliably timeout") {
+    val clientConf = new SparkConf
+    clientConf.set("spark.authenticate", "false")
+    val ackTimeout = 30
+    clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}")
+
+    val clientSecurityManager = new SecurityManager(clientConf)
+    val manager = new ConnectionManager(0, clientConf, clientSecurityManager)
+
+    val serverConf = new SparkConf
+    serverConf.set("spark.authenticate", "false")
+    val serverSecurityManager = new SecurityManager(serverConf)
+    val managerServer = new ConnectionManager(0, serverConf, 
serverSecurityManager)
+    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      // sleep 60 sec > ack timeout for simulating server slow down or hang up
+      Thread.sleep(ackTimeout * 3 * 1000)
+      None
+    })
+
+    val size = 10 * 1024 * 1024
+    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
+    buffer.flip
+    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+
+    val future = manager.sendMessageReliably(managerServer.id, bufferMessage)
+
+    // Future should throw IOException in 30 sec.
+    // Otherwise TimeoutExcepton is thrown from Await.result.
+    // We expect TimeoutException is not thrown.
+    intercept[IOException] {
+      Await.result(future, (ackTimeout * 2) second)
+    }
+
+    manager.stop()
+    managerServer.stop()
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
index 6061e54..ba47fe5 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.FunSuite
 
 import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf}
 import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.network.{FileSegmentManagedBuffer, ManagedBuffer}
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.shuffle.FileShuffleBlockManager
 import org.apache.spark.storage.{ShuffleBlockId, FileSegment}
@@ -32,10 +33,12 @@ import org.apache.spark.storage.{ShuffleBlockId, 
FileSegment}
 class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
   private val testConf = new SparkConf(false)
 
-  private def checkSegments(segment1: FileSegment, segment2: FileSegment) {
-    assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath)
-    assert (segment1.offset === segment2.offset)
-    assert (segment1.length === segment2.length)
+  private def checkSegments(expected: FileSegment, buffer: ManagedBuffer) {
+    assert(buffer.isInstanceOf[FileSegmentManagedBuffer])
+    val segment = buffer.asInstanceOf[FileSegmentManagedBuffer]
+    assert(expected.file.getCanonicalPath === segment.file.getCanonicalPath)
+    assert(expected.offset === segment.offset)
+    assert(expected.length === segment.length)
   }
 
   test("consolidated shuffle can write to shuffle group without messing 
existing offsets/lengths") {
@@ -95,14 +98,12 @@ class HashShuffleManagerSuite extends FunSuite with 
LocalSparkContext {
       writer.commitAndClose()
     }
     // check before we register.
-    checkSegments(shuffle2Segment, 
shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)).left.get)
+    checkSegments(shuffle2Segment, 
shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
     shuffle3.releaseWriters(success = true)
-    checkSegments(shuffle2Segment, 
shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)).left.get)
+    checkSegments(shuffle2Segment, 
shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
     shuffleBlockManager.removeShuffle(1)
-
   }
 
-
   def writeToFile(file: File, numBytes: Int) {
     val writer = new FileWriter(file, true)
     for (i <- 0 until numBytes) writer.write(i)

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
deleted file mode 100644
index 3c86f6b..0000000
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.io.IOException
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.future
-import scala.concurrent.ExecutionContext.Implicits.global
-
-import org.scalatest.{FunSuite, Matchers}
-
-import org.mockito.Mockito._
-import org.mockito.Matchers.{any, eq => meq}
-import org.mockito.stubbing.Answer
-import org.mockito.invocation.InvocationOnMock
-
-import org.apache.spark.storage.BlockFetcherIterator._
-import org.apache.spark.network.{ConnectionManager, Message}
-import org.apache.spark.executor.ShuffleReadMetrics
-
-class BlockFetcherIteratorSuite extends FunSuite with Matchers {
-
-  test("block fetch from local fails using BasicBlockFetcherIterator") {
-    val blockManager = mock(classOf[BlockManager])
-    val connManager = mock(classOf[ConnectionManager])
-    doReturn(connManager).when(blockManager).connectionManager
-    doReturn(BlockManagerId("test-client", "test-client", 
1)).when(blockManager).blockManagerId
-
-    doReturn((48 * 1024 * 
1024).asInstanceOf[Long]).when(blockManager).maxBytesInFlight
-
-    val blIds = Array[BlockId](
-      ShuffleBlockId(0,0,0),
-      ShuffleBlockId(0,1,0),
-      ShuffleBlockId(0,2,0),
-      ShuffleBlockId(0,3,0),
-      ShuffleBlockId(0,4,0))
-
-    val optItr = mock(classOf[Option[Iterator[Any]]])
-    val answer = new Answer[Option[Iterator[Any]]] {
-      override def answer(invocation: InvocationOnMock) = 
Option[Iterator[Any]] {
-        throw new Exception
-      }
-    }
-
-    // 3rd block is going to fail
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), 
any())
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), 
any())
-    doAnswer(answer).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), 
any())
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), 
any())
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), 
any())
-
-    val bmId = BlockManagerId("test-client", "test-client", 1)
-    val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
-      (bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq)
-    )
-
-    val iterator = new BasicBlockFetcherIterator(blockManager, 
blocksByAddress, null,
-      new ShuffleReadMetrics())
-
-    iterator.initialize()
-
-    // Without exhausting the iterator, the iterator should be lazy and not 
call getLocalShuffleFromDisk.
-    verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any())
-
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
no elements")
-    // the 2nd element of the tuple returned by iterator.next should be 
defined when fetching successfully
-    assert(iterator.next()._2.isDefined, "1st element should be defined but is 
not actually defined")
-    verify(blockManager, times(1)).getLocalShuffleFromDisk(any(), any())
-
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
1 element")
-    assert(iterator.next()._2.isDefined, "2nd element should be defined but is 
not actually defined")
-    verify(blockManager, times(2)).getLocalShuffleFromDisk(any(), any())
-
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
2 elements")
-    // 3rd fetch should be failed
-    intercept[Exception] {
-      iterator.next()
-    }
-    verify(blockManager, times(3)).getLocalShuffleFromDisk(any(), any())
-  }
-
-
-  test("block fetch from local succeed using BasicBlockFetcherIterator") {
-    val blockManager = mock(classOf[BlockManager])
-    val connManager = mock(classOf[ConnectionManager])
-    doReturn(connManager).when(blockManager).connectionManager
-    doReturn(BlockManagerId("test-client", "test-client", 
1)).when(blockManager).blockManagerId
-
-    doReturn((48 * 1024 * 
1024).asInstanceOf[Long]).when(blockManager).maxBytesInFlight
-
-    val blIds = Array[BlockId](
-      ShuffleBlockId(0,0,0),
-      ShuffleBlockId(0,1,0),
-      ShuffleBlockId(0,2,0),
-      ShuffleBlockId(0,3,0),
-      ShuffleBlockId(0,4,0))
-
-    val optItr = mock(classOf[Option[Iterator[Any]]])
- 
-   // All blocks should be fetched successfully
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), 
any())
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), 
any())
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), 
any())
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), 
any())
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), 
any())
-
-    val bmId = BlockManagerId("test-client", "test-client", 1)
-    val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
-      (bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq)
-    )
-
-    val iterator = new BasicBlockFetcherIterator(blockManager, 
blocksByAddress, null,
-      new ShuffleReadMetrics())
-
-    iterator.initialize()
-
-    // Without exhausting the iterator, the iterator should be lazy and not 
call getLocalShuffleFromDisk.
-    verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any())
-
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
no elements")
-    assert(iterator.next._2.isDefined, "All elements should be defined but 1st 
element is not actually defined") 
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
1 element")
-    assert(iterator.next._2.isDefined, "All elements should be defined but 2nd 
element is not actually defined") 
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
2 elements")
-    assert(iterator.next._2.isDefined, "All elements should be defined but 3rd 
element is not actually defined") 
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
3 elements")
-    assert(iterator.next._2.isDefined, "All elements should be defined but 4th 
element is not actually defined") 
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
4 elements")
-    assert(iterator.next._2.isDefined, "All elements should be defined but 5th 
element is not actually defined")
-
-    verify(blockManager, times(5)).getLocalShuffleFromDisk(any(), any())
-  }
-
-  test("block fetch from remote fails using BasicBlockFetcherIterator") {
-    val blockManager = mock(classOf[BlockManager])
-    val connManager = mock(classOf[ConnectionManager])
-    when(blockManager.connectionManager).thenReturn(connManager)
-
-    val f = future {
-      throw new IOException("Send failed or we received an error ACK")
-    }
-    when(connManager.sendMessageReliably(any(),
-      any())).thenReturn(f)
-    when(blockManager.futureExecContext).thenReturn(global)
-
-    when(blockManager.blockManagerId).thenReturn(
-      BlockManagerId("test-client", "test-client", 1))
-    when(blockManager.maxBytesInFlight).thenReturn(48 * 1024 * 1024)
-
-    val blId1 = ShuffleBlockId(0,0,0)
-    val blId2 = ShuffleBlockId(0,1,0)
-    val bmId = BlockManagerId("test-server", "test-server", 1)
-    val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
-      (bmId, Seq((blId1, 1L), (blId2, 1L)))
-    )
-
-    val iterator = new BasicBlockFetcherIterator(blockManager,
-      blocksByAddress, null, new ShuffleReadMetrics())
-
-    iterator.initialize()
-    iterator.foreach{
-      case (_, r) => {
-        (!r.isDefined) should be(true)
-      }
-    }
-  }
-
-  test("block fetch from remote succeed using BasicBlockFetcherIterator") {
-    val blockManager = mock(classOf[BlockManager])
-    val connManager = mock(classOf[ConnectionManager])
-    when(blockManager.connectionManager).thenReturn(connManager)
-
-    val blId1 = ShuffleBlockId(0,0,0)
-    val blId2 = ShuffleBlockId(0,1,0)
-    val buf1 = ByteBuffer.allocate(4)
-    val buf2 = ByteBuffer.allocate(4)
-    buf1.putInt(1)
-    buf1.flip()
-    buf2.putInt(1)
-    buf2.flip()
-    val blockMessage1 = BlockMessage.fromGotBlock(GotBlock(blId1, buf1))
-    val blockMessage2 = BlockMessage.fromGotBlock(GotBlock(blId2, buf2))
-    val blockMessageArray = new BlockMessageArray(
-      Seq(blockMessage1, blockMessage2))
-
-    val bufferMessage = blockMessageArray.toBufferMessage
-    val buffer = ByteBuffer.allocate(bufferMessage.size)
-    val arrayBuffer = new ArrayBuffer[ByteBuffer]
-    bufferMessage.buffers.foreach{ b =>
-      buffer.put(b)
-    }
-    buffer.flip()
-    arrayBuffer += buffer
-
-    val f = future {
-      Message.createBufferMessage(arrayBuffer)
-    }
-    when(connManager.sendMessageReliably(any(),
-      any())).thenReturn(f)
-    when(blockManager.futureExecContext).thenReturn(global)
-
-    when(blockManager.blockManagerId).thenReturn(
-      BlockManagerId("test-client", "test-client", 1))
-    when(blockManager.maxBytesInFlight).thenReturn(48 * 1024 * 1024)
-
-    val bmId = BlockManagerId("test-server", "test-server", 1)
-    val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
-      (bmId, Seq((blId1, 1L), (blId2, 1L)))
-    )
-
-    val iterator = new BasicBlockFetcherIterator(blockManager,
-      blocksByAddress, null, new ShuffleReadMetrics())
-    iterator.initialize()
-    iterator.foreach{
-      case (_, r) => {
-        (r.isDefined) should be(true)
-      }
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to