http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
 
b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
index e311320..11793ea 100644
--- 
a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
@@ -19,12 +19,13 @@ package org.apache.spark.network.nio
 
 import java.nio.ByteBuffer
 
-import scala.concurrent.Future
-
-import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
 import org.apache.spark.network._
+import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.storage.{BlockId, StorageLevel}
 import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
+
+import scala.concurrent.Future
 
 
 /**
@@ -71,7 +72,7 @@ final class NioBlockTransferService(conf: SparkConf, 
securityManager: SecurityMa
   /**
    * Tear down the transfer service.
    */
-  override def stop(): Unit = {
+  override def close(): Unit = {
     if (cm != null) {
       cm.stop()
     }
@@ -95,27 +96,34 @@ final class NioBlockTransferService(conf: SparkConf, 
securityManager: SecurityMa
     future.onSuccess { case message =>
       val bufferMessage = message.asInstanceOf[BufferMessage]
       val blockMessageArray = 
BlockMessageArray.fromBufferMessage(bufferMessage)
+
       // SPARK-4064: In some cases(eg. Remote block was removed) 
blockMessageArray may be empty.
       if (blockMessageArray.isEmpty) {
-        listener.onBlockFetchFailure(
-          new SparkException(s"Received empty message from $cmId"))
+        blockIds.foreach { id =>
+          listener.onBlockFetchFailure(id, new SparkException(s"Received empty 
message from $cmId"))
+        }
       } else {
-        for (blockMessage <- blockMessageArray) {
+        for (blockMessage: BlockMessage <- blockMessageArray) {
           val msgType = blockMessage.getType
           if (msgType != BlockMessage.TYPE_GOT_BLOCK) {
-            listener.onBlockFetchFailure(
-              new SparkException(s"Unexpected message ${msgType} received from 
$cmId"))
+            if (blockMessage.getId != null) {
+              listener.onBlockFetchFailure(blockMessage.getId.toString,
+                new SparkException(s"Unexpected message $msgType received from 
$cmId"))
+            }
           } else {
             val blockId = blockMessage.getId
+            val networkSize = blockMessage.getData.limit()
             listener.onBlockFetchSuccess(
-              blockId.toString, new 
NioByteBufferManagedBuffer(blockMessage.getData))
+              blockId.toString, new NioManagedBuffer(blockMessage.getData))
           }
         }
       }
     }(cm.futureExecContext)
 
     future.onFailure { case exception =>
-      listener.onBlockFetchFailure(exception)
+      blockIds.foreach { blockId =>
+        listener.onBlockFetchFailure(blockId, exception)
+      }
     }(cm.futureExecContext)
   }
 
@@ -127,12 +135,12 @@ final class NioBlockTransferService(conf: SparkConf, 
securityManager: SecurityMa
   override def uploadBlock(
       hostname: String,
       port: Int,
-      blockId: String,
+      blockId: BlockId,
       blockData: ManagedBuffer,
       level: StorageLevel)
     : Future[Unit] = {
     checkInit()
-    val msg = PutBlock(BlockId(blockId), blockData.nioByteBuffer(), level)
+    val msg = PutBlock(blockId, blockData.nioByteBuffer(), level)
     val blockMessageArray = new 
BlockMessageArray(BlockMessage.fromPutBlock(msg))
     val remoteCmId = new ConnectionManagerId(hostName, port)
     val reply = cm.sendMessageReliably(remoteCmId, 
blockMessageArray.toBufferMessage)
@@ -154,10 +162,9 @@ final class NioBlockTransferService(conf: SparkConf, 
securityManager: SecurityMa
           val responseMessages = 
blockMessages.map(processBlockMessage).filter(_ != None).map(_.get)
           Some(new BlockMessageArray(responseMessages).toBufferMessage)
         } catch {
-          case e: Exception => {
+          case e: Exception =>
             logError("Exception handling buffer message", e)
             Some(Message.createErrorMessage(e, msg.id))
-          }
         }
 
       case otherMessage: Any =>
@@ -172,13 +179,13 @@ final class NioBlockTransferService(conf: SparkConf, 
securityManager: SecurityMa
       case BlockMessage.TYPE_PUT_BLOCK =>
         val msg = PutBlock(blockMessage.getId, blockMessage.getData, 
blockMessage.getLevel)
         logDebug("Received [" + msg + "]")
-        putBlock(msg.id.toString, msg.data, msg.level)
+        putBlock(msg.id, msg.data, msg.level)
         None
 
       case BlockMessage.TYPE_GET_BLOCK =>
         val msg = new GetBlock(blockMessage.getId)
         logDebug("Received [" + msg + "]")
-        val buffer = getBlock(msg.id.toString)
+        val buffer = getBlock(msg.id)
         if (buffer == null) {
           return None
         }
@@ -188,20 +195,20 @@ final class NioBlockTransferService(conf: SparkConf, 
securityManager: SecurityMa
     }
   }
 
-  private def putBlock(blockId: String, bytes: ByteBuffer, level: 
StorageLevel) {
+  private def putBlock(blockId: BlockId, bytes: ByteBuffer, level: 
StorageLevel) {
     val startTimeMs = System.currentTimeMillis()
     logDebug("PutBlock " + blockId + " started from " + startTimeMs + " with 
data: " + bytes)
-    blockDataManager.putBlockData(blockId, new 
NioByteBufferManagedBuffer(bytes), level)
+    blockDataManager.putBlockData(blockId, new NioManagedBuffer(bytes), level)
     logDebug("PutBlock " + blockId + " used " + 
Utils.getUsedTimeMs(startTimeMs)
       + " with data size: " + bytes.limit)
   }
 
-  private def getBlock(blockId: String): ByteBuffer = {
+  private def getBlock(blockId: BlockId): ByteBuffer = {
     val startTimeMs = System.currentTimeMillis()
     logDebug("GetBlock " + blockId + " started from " + startTimeMs)
-    val buffer = blockDataManager.getBlockData(blockId).orNull
+    val buffer = blockDataManager.getBlockData(blockId)
     logDebug("GetBlock " + blockId + " used " + 
Utils.getUsedTimeMs(startTimeMs)
       + " and got buffer " + buffer)
-    if (buffer == null) null else buffer.nioByteBuffer()
+    buffer.nioByteBuffer()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
index a9144cd..ca6e971 100644
--- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -17,14 +17,14 @@
 
 package org.apache.spark.serializer
 
-import java.io.{ByteArrayOutputStream, EOFException, InputStream, OutputStream}
+import java.io._
 import java.nio.ByteBuffer
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.SparkEnv
+import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.{ByteBufferInputStream, NextIterator}
+import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator}
 
 /**
  * :: DeveloperApi ::

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index 439981d..1fb5b2c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -24,9 +24,9 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConversions._
 
-import org.apache.spark.{SparkEnv, SparkConf, Logging}
+import org.apache.spark.{Logging, SparkConf, SparkEnv}
 import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.network.{FileSegmentManagedBuffer, ManagedBuffer}
+import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
 import org.apache.spark.storage._

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
index b5cd34c..e9805c9 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer
 import com.google.common.io.ByteStreams
 
 import org.apache.spark.SparkEnv
-import org.apache.spark.network.{ManagedBuffer, FileSegmentManagedBuffer}
+import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
 import org.apache.spark.storage._
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala
index 63863cc..b521f0c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala
@@ -18,8 +18,7 @@
 package org.apache.spark.shuffle
 
 import java.nio.ByteBuffer
-
-import org.apache.spark.network.ManagedBuffer
+import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.storage.ShuffleBlockId
 
 private[spark]

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/storage/BlockDataProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockDataProvider.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockDataProvider.scala
deleted file mode 100644
index 5b6d086..0000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockDataProvider.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-
-/**
- * An interface for providing data for blocks.
- *
- * getBlockData returns either a FileSegment (for zero-copy send), or a 
ByteBuffer.
- *
- * Aside from unit tests, [[BlockManager]] is the main class that implements 
this.
- */
-private[spark] trait BlockDataProvider {
-  def getBlockData(blockId: String): Either[FileSegment, ByteBuffer]
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 4cc9792..58510d7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -17,15 +17,13 @@
 
 package org.apache.spark.storage
 
-import java.io.{File, InputStream, OutputStream, BufferedOutputStream, 
ByteArrayOutputStream}
+import java.io.{BufferedOutputStream, ByteArrayOutputStream, File, 
InputStream, OutputStream}
 import java.nio.{ByteBuffer, MappedByteBuffer}
 
-import scala.concurrent.ExecutionContext.Implicits.global
-
-import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
 import scala.util.Random
 
 import akka.actor.{ActorSystem, Props}
@@ -35,11 +33,11 @@ import org.apache.spark._
 import org.apache.spark.executor._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.network._
+import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.util._
 
-
 private[spark] sealed trait BlockValues
 private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends 
BlockValues
 private[spark] case class IteratorValues(iterator: Iterator[Any]) extends 
BlockValues
@@ -212,21 +210,20 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Interface to get local block data.
-   *
-   * @return Some(buffer) if the block exists locally, and None if it doesn't.
+   * Interface to get local block data. Throws an exception if the block 
cannot be found or
+   * cannot be read successfully.
    */
-  override def getBlockData(blockId: String): Option[ManagedBuffer] = {
-    val bid = BlockId(blockId)
-    if (bid.isShuffle) {
-      
Some(shuffleManager.shuffleBlockManager.getBlockData(bid.asInstanceOf[ShuffleBlockId]))
+  override def getBlockData(blockId: BlockId): ManagedBuffer = {
+    if (blockId.isShuffle) {
+      
shuffleManager.shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
     } else {
-      val blockBytesOpt = doGetLocal(bid, asBlockResult = 
false).asInstanceOf[Option[ByteBuffer]]
+      val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
+        .asInstanceOf[Option[ByteBuffer]]
       if (blockBytesOpt.isDefined) {
         val buffer = blockBytesOpt.get
-        Some(new NioByteBufferManagedBuffer(buffer))
+        new NioManagedBuffer(buffer)
       } else {
-        None
+        throw new BlockNotFoundException(blockId.toString)
       }
     }
   }
@@ -234,8 +231,8 @@ private[spark] class BlockManager(
   /**
    * Put the block locally, using the given storage level.
    */
-  override def putBlockData(blockId: String, data: ManagedBuffer, level: 
StorageLevel): Unit = {
-    putBytes(BlockId(blockId), data.nioByteBuffer(), level)
+  override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: 
StorageLevel): Unit = {
+    putBytes(blockId, data.nioByteBuffer(), level)
   }
 
   /**
@@ -341,17 +338,6 @@ private[spark] class BlockManager(
   }
 
   /**
-   * A short-circuited method to get blocks directly from disk. This is used 
for getting
-   * shuffle blocks. It is safe to do so without a lock on block info since 
disk store
-   * never deletes (recent) items.
-   */
-  def getLocalShuffleFromDisk(blockId: BlockId, serializer: Serializer): 
Option[Iterator[Any]] = {
-    val buf = 
shuffleManager.shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
-    val is = wrapForCompression(blockId, buf.inputStream())
-    Some(serializer.newInstance().deserializeStream(is).asIterator)
-  }
-
-  /**
    * Get block from local block manager.
    */
   def getLocal(blockId: BlockId): Option[BlockResult] = {
@@ -869,9 +855,9 @@ private[spark] class BlockManager(
             data.rewind()
             logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes 
to $peer")
             blockTransferService.uploadBlockSync(
-              peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-            logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer 
in %d ms"
-              .format((System.currentTimeMillis - onePeerStartTime)))
+              peer.host, peer.port, blockId, new NioManagedBuffer(data), 
tLevel)
+            logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer 
in %s ms"
+              .format(System.currentTimeMillis - onePeerStartTime))
             peersReplicatedTo += peer
             peersForReplication -= peer
             replicationFailed = false
@@ -1126,7 +1112,7 @@ private[spark] class BlockManager(
   }
 
   def stop(): Unit = {
-    blockTransferService.stop()
+    blockTransferService.close()
     diskBlockManager.stop()
     actorSystem.stop(slaveActor)
     blockInfo.clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/storage/BlockNotFoundException.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockNotFoundException.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockNotFoundException.scala
index 9ef4536..81f5f2d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockNotFoundException.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockNotFoundException.scala
@@ -17,5 +17,4 @@
 
 package org.apache.spark.storage
 
-
 class BlockNotFoundException(blockId: String) extends Exception(s"Block 
$blockId not found")

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 71b276b..0d6f3bf 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -19,15 +19,13 @@ package org.apache.spark.storage
 
 import java.util.concurrent.LinkedBlockingQueue
 
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashSet
-import scala.collection.mutable.Queue
+import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
 
-import org.apache.spark.{TaskContext, Logging}
-import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener, 
BlockTransferService}
+import org.apache.spark.{Logging, TaskContext}
+import org.apache.spark.network.{BlockFetchingListener, BlockTransferService}
+import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.Utils
-
+import org.apache.spark.util.{CompletionIterator, Utils}
 
 /**
  * An iterator that fetches multiple blocks. For local blocks, it fetches from 
the local block
@@ -88,17 +86,51 @@ final class ShuffleBlockFetcherIterator(
    */
   private[this] val results = new LinkedBlockingQueue[FetchResult]
 
-  // Queue of fetch requests to issue; we'll pull requests off this gradually 
to make sure that
-  // the number of bytes in flight is limited to maxBytesInFlight
+  /**
+   * Current [[FetchResult]] being processed. We track this so we can release 
the current buffer
+   * in case of a runtime exception when processing the current buffer.
+   */
+  private[this] var currentResult: FetchResult = null
+
+  /**
+   * Queue of fetch requests to issue; we'll pull requests off this gradually 
to make sure that
+   * the number of bytes in flight is limited to maxBytesInFlight.
+   */
   private[this] val fetchRequests = new Queue[FetchRequest]
 
-  // Current bytes in flight from our requests
+  /** Current bytes in flight from our requests */
   private[this] var bytesInFlight = 0L
 
   private[this] val shuffleMetrics = 
context.taskMetrics.createShuffleReadMetricsForDependency()
 
+  /**
+   * Whether the iterator is still active. If isZombie is true, the callback 
interface will no
+   * longer place fetched blocks into [[results]].
+   */
+  @volatile private[this] var isZombie = false
+
   initialize()
 
+  /**
+   * Mark the iterator as zombie, and release all buffers that haven't been 
deserialized yet.
+   */
+  private[this] def cleanup() {
+    isZombie = true
+    // Release the current buffer if necessary
+    if (currentResult != null && !currentResult.failed) {
+      currentResult.buf.release()
+    }
+
+    // Release buffers in the results queue
+    val iter = results.iterator()
+    while (iter.hasNext) {
+      val result = iter.next()
+      if (!result.failed) {
+        result.buf.release()
+      }
+    }
+  }
+
   private[this] def sendRequest(req: FetchRequest) {
     logDebug("Sending request for %d blocks (%s) from %s".format(
       req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
@@ -110,24 +142,23 @@ final class ShuffleBlockFetcherIterator(
 
     blockTransferService.fetchBlocks(req.address.host, req.address.port, 
blockIds,
       new BlockFetchingListener {
-        override def onBlockFetchSuccess(blockId: String, data: 
ManagedBuffer): Unit = {
-          results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
-            () => serializer.newInstance().deserializeStream(
-              blockManager.wrapForCompression(BlockId(blockId), 
data.inputStream())).asIterator
-          ))
-          shuffleMetrics.remoteBytesRead += data.size
-          shuffleMetrics.remoteBlocksFetched += 1
-          logDebug("Got remote block " + blockId + " after " + 
Utils.getUsedTimeMs(startTime))
+        override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): 
Unit = {
+          // Only add the buffer to results queue if the iterator is not 
zombie,
+          // i.e. cleanup() has not been called yet.
+          if (!isZombie) {
+            // Increment the ref count because we need to pass this to a 
different thread.
+            // This needs to be released after use.
+            buf.retain()
+            results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), 
buf))
+            shuffleMetrics.remoteBytesRead += buf.size
+            shuffleMetrics.remoteBlocksFetched += 1
+          }
+          logTrace("Got remote block " + blockId + " after " + 
Utils.getUsedTimeMs(startTime))
         }
 
-        override def onBlockFetchFailure(e: Throwable): Unit = {
+        override def onBlockFetchFailure(blockId: String, e: Throwable): Unit 
= {
           logError(s"Failed to get block(s) from 
${req.address.host}:${req.address.port}", e)
-          // Note that there is a chance that some blocks have been fetched 
successfully, but we
-          // still add them to the failed queue. This is fine because when the 
caller see a
-          // FetchFailedException, it is going to fail the entire task anyway.
-          for ((blockId, size) <- req.blocks) {
-            results.put(new FetchResult(blockId, -1, null))
-          }
+          results.put(new FetchResult(BlockId(blockId), -1, null))
         }
       }
     )
@@ -138,7 +169,7 @@ final class ShuffleBlockFetcherIterator(
     // smaller than maxBytesInFlight is to allow multiple, parallel fetches 
from up to 5
     // nodes, rather than blocking on reading output from one node.
     val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
-    logInfo("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " 
+ targetRequestSize)
+    logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " 
+ targetRequestSize)
 
     // Split local and remote blocks. Remote blocks are further split into 
FetchRequests of size
     // at most maxBytesInFlight in order to limit the amount of data in flight.
@@ -185,26 +216,34 @@ final class ShuffleBlockFetcherIterator(
     remoteRequests
   }
 
+  /**
+   * Fetch the local blocks while we are fetching remote blocks. This is ok 
because
+   * [[ManagedBuffer]]'s memory is allocated lazily when we create the input 
stream, so all we
+   * track in-memory are the ManagedBuffer references themselves.
+   */
   private[this] def fetchLocalBlocks() {
-    // Get the local blocks while remote blocks are being fetched. Note that 
it's okay to do
-    // these all at once because they will just memory-map some files, so they 
won't consume
-    // any memory that might exceed our maxBytesInFlight
-    for (id <- localBlocks) {
+    val iter = localBlocks.iterator
+    while (iter.hasNext) {
+      val blockId = iter.next()
       try {
+        val buf = blockManager.getBlockData(blockId)
         shuffleMetrics.localBlocksFetched += 1
-        results.put(new FetchResult(
-          id, 0, () => blockManager.getLocalShuffleFromDisk(id, 
serializer).get))
-        logDebug("Got local block " + id)
+        buf.retain()
+        results.put(new FetchResult(blockId, 0, buf))
       } catch {
         case e: Exception =>
+          // If we see an exception, stop immediately.
           logError(s"Error occurred while fetching local blocks", e)
-          results.put(new FetchResult(id, -1, null))
+          results.put(new FetchResult(blockId, -1, null))
           return
       }
     }
   }
 
   private[this] def initialize(): Unit = {
+    // Add a task completion callback (called in both success case and failure 
case) to cleanup.
+    context.addTaskCompletionListener(_ => cleanup())
+
     // Split local and remote blocks.
     val remoteRequests = splitLocalRemoteBlocks()
     // Add the remote requests into our queue in a random order
@@ -229,7 +268,8 @@ final class ShuffleBlockFetcherIterator(
   override def next(): (BlockId, Option[Iterator[Any]]) = {
     numBlocksProcessed += 1
     val startFetchWait = System.currentTimeMillis()
-    val result = results.take()
+    currentResult = results.take()
+    val result = currentResult
     val stopFetchWait = System.currentTimeMillis()
     shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait)
     if (!result.failed) {
@@ -240,7 +280,21 @@ final class ShuffleBlockFetcherIterator(
       (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= 
maxBytesInFlight)) {
       sendRequest(fetchRequests.dequeue())
     }
-    (result.blockId, if (result.failed) None else Some(result.deserialize()))
+
+    val iteratorOpt: Option[Iterator[Any]] = if (result.failed) {
+      None
+    } else {
+      val is = blockManager.wrapForCompression(result.blockId, 
result.buf.createInputStream())
+      val iter = serializer.newInstance().deserializeStream(is).asIterator
+      Some(CompletionIterator[Any, Iterator[Any]](iter, {
+        // Once the iterator is exhausted, release the buffer and set 
currentResult to null
+        // so we don't release it again in cleanup.
+        currentResult = null
+        result.buf.release()
+      }))
+    }
+
+    (result.blockId, iteratorOpt)
   }
 }
 
@@ -254,7 +308,7 @@ object ShuffleBlockFetcherIterator {
    * @param blocks Sequence of tuple, where the first element is the block id,
    *               and the second element is the estimated size, used to 
calculate bytesInFlight.
    */
-  class FetchRequest(val address: BlockManagerId, val blocks: Seq[(BlockId, 
Long)]) {
+  case class FetchRequest(address: BlockManagerId, blocks: Seq[(BlockId, 
Long)]) {
     val size = blocks.map(_._2).sum
   }
 
@@ -262,10 +316,11 @@ object ShuffleBlockFetcherIterator {
    * Result of a fetch from a remote block. A failure is represented as size 
== -1.
    * @param blockId block id
    * @param size estimated size of the block, used to calculate bytesInFlight.
-   *             Note that this is NOT the exact bytes.
-   * @param deserialize closure to return the result in the form of an 
Iterator.
+   *             Note that this is NOT the exact bytes. -1 if failure is 
present.
+   * @param buf [[ManagedBuffer]] for the content. null is error.
    */
-  class FetchResult(val blockId: BlockId, val size: Long, val deserialize: () 
=> Iterator[Any]) {
+  case class FetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer) {
     def failed: Boolean = size == -1
+    if (failed) assert(buf == null) else assert(buf != null)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1e881da..0daab91 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -43,7 +43,6 @@ import org.json4s._
 import tachyon.client.{TachyonFile,TachyonFS}
 
 import org.apache.spark._
-import org.apache.spark.util.SparkUncaughtExceptionHandler
 import org.apache.spark.serializer.{DeserializationStream, 
SerializationStream, SerializerInstance}
 
 /** CallSite represents a place in user code. It can have a short and a long 
form. */

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala 
b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
index d7b2d2e..840d827 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
@@ -24,10 +24,10 @@ class ShuffleNettySuite extends ShuffleSuite with 
BeforeAndAfterAll {
   // This test suite should run all tests in ShuffleSuite with Netty shuffle 
mode.
 
   override def beforeAll() {
-    System.setProperty("spark.shuffle.use.netty", "true")
+    System.setProperty("spark.shuffle.blockTransferService", "netty")
   }
 
   override def afterAll() {
-    System.clearProperty("spark.shuffle.use.netty")
+    System.clearProperty("spark.shuffle.blockTransferService")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/test/scala/org/apache/spark/network/netty/ServerClientIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/network/netty/ServerClientIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/network/netty/ServerClientIntegrationSuite.scala
deleted file mode 100644
index 02d0ffc..0000000
--- 
a/core/src/test/scala/org/apache/spark/network/netty/ServerClientIntegrationSuite.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import java.io.{RandomAccessFile, File}
-import java.nio.ByteBuffer
-import java.util.{Collections, HashSet}
-import java.util.concurrent.{TimeUnit, Semaphore}
-
-import scala.collection.JavaConversions._
-
-import io.netty.buffer.{ByteBufUtil, Unpooled}
-
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.network.netty.client.{BlockClientListener, 
ReferenceCountedBuffer, BlockFetchingClientFactory}
-import org.apache.spark.network.netty.server.BlockServer
-import org.apache.spark.storage.{FileSegment, BlockDataProvider}
-
-
-/**
- * Test suite that makes sure the server and the client implementations share 
the same protocol.
- */
-class ServerClientIntegrationSuite extends FunSuite with BeforeAndAfterAll {
-
-  val bufSize = 100000
-  var buf: ByteBuffer = _
-  var testFile: File = _
-  var server: BlockServer = _
-  var clientFactory: BlockFetchingClientFactory = _
-
-  val bufferBlockId = "buffer_block"
-  val fileBlockId = "file_block"
-
-  val fileContent = new Array[Byte](1024)
-  scala.util.Random.nextBytes(fileContent)
-
-  override def beforeAll() = {
-    buf = ByteBuffer.allocate(bufSize)
-    for (i <- 1 to bufSize) {
-      buf.put(i.toByte)
-    }
-    buf.flip()
-
-    testFile = File.createTempFile("netty-test-file", "txt")
-    val fp = new RandomAccessFile(testFile, "rw")
-    fp.write(fileContent)
-    fp.close()
-
-    server = new BlockServer(new SparkConf, new BlockDataProvider {
-      override def getBlockData(blockId: String): Either[FileSegment, 
ByteBuffer] = {
-        if (blockId == bufferBlockId) {
-          Right(buf)
-        } else if (blockId == fileBlockId) {
-          Left(new FileSegment(testFile, 10, testFile.length - 25))
-        } else {
-          throw new Exception("Unknown block id " + blockId)
-        }
-      }
-    })
-
-    clientFactory = new BlockFetchingClientFactory(new SparkConf)
-  }
-
-  override def afterAll() = {
-    server.stop()
-    clientFactory.stop()
-  }
-
-  /** A ByteBuf for buffer_block */
-  lazy val byteBufferBlockReference = Unpooled.wrappedBuffer(buf)
-
-  /** A ByteBuf for file_block */
-  lazy val fileBlockReference = Unpooled.wrappedBuffer(fileContent, 10, 
fileContent.length - 25)
-
-  def fetchBlocks(blockIds: Seq[String]): (Set[String], 
Set[ReferenceCountedBuffer], Set[String]) =
-  {
-    val client = clientFactory.createClient(server.hostName, server.port)
-    val sem = new Semaphore(0)
-    val receivedBlockIds = Collections.synchronizedSet(new HashSet[String])
-    val errorBlockIds = Collections.synchronizedSet(new HashSet[String])
-    val receivedBuffers = Collections.synchronizedSet(new 
HashSet[ReferenceCountedBuffer])
-
-    client.fetchBlocks(
-      blockIds,
-      new BlockClientListener {
-        override def onFetchFailure(blockId: String, errorMsg: String): Unit = 
{
-          errorBlockIds.add(blockId)
-          sem.release()
-        }
-
-        override def onFetchSuccess(blockId: String, data: 
ReferenceCountedBuffer): Unit = {
-          receivedBlockIds.add(blockId)
-          data.retain()
-          receivedBuffers.add(data)
-          sem.release()
-        }
-      }
-    )
-    if (!sem.tryAcquire(blockIds.size, 30, TimeUnit.SECONDS)) {
-      fail("Timeout getting response from the server")
-    }
-    client.close()
-    (receivedBlockIds.toSet, receivedBuffers.toSet, errorBlockIds.toSet)
-  }
-
-  test("fetch a ByteBuffer block") {
-    val (blockIds, buffers, failBlockIds) = fetchBlocks(Seq(bufferBlockId))
-    assert(blockIds === Set(bufferBlockId))
-    assert(buffers.map(_.underlying) === Set(byteBufferBlockReference))
-    assert(failBlockIds.isEmpty)
-    buffers.foreach(_.release())
-  }
-
-  test("fetch a FileSegment block via zero-copy send") {
-    val (blockIds, buffers, failBlockIds) = fetchBlocks(Seq(fileBlockId))
-    assert(blockIds === Set(fileBlockId))
-    assert(buffers.map(_.underlying) === Set(fileBlockReference))
-    assert(failBlockIds.isEmpty)
-    buffers.foreach(_.release())
-  }
-
-  test("fetch a non-existent block") {
-    val (blockIds, buffers, failBlockIds) = fetchBlocks(Seq("random-block"))
-    assert(blockIds.isEmpty)
-    assert(buffers.isEmpty)
-    assert(failBlockIds === Set("random-block"))
-  }
-
-  test("fetch both ByteBuffer block and FileSegment block") {
-    val (blockIds, buffers, failBlockIds) = fetchBlocks(Seq(bufferBlockId, 
fileBlockId))
-    assert(blockIds === Set(bufferBlockId, fileBlockId))
-    assert(buffers.map(_.underlying) === Set(byteBufferBlockReference, 
fileBlockReference))
-    assert(failBlockIds.isEmpty)
-    buffers.foreach(_.release())
-  }
-
-  test("fetch both ByteBuffer block and a non-existent block") {
-    val (blockIds, buffers, failBlockIds) = fetchBlocks(Seq(bufferBlockId, 
"random-block"))
-    assert(blockIds === Set(bufferBlockId))
-    assert(buffers.map(_.underlying) === Set(byteBufferBlockReference))
-    assert(failBlockIds === Set("random-block"))
-    buffers.foreach(_.release())
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala
 
b/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala
deleted file mode 100644
index f629322..0000000
--- 
a/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.client
-
-import java.nio.ByteBuffer
-
-import com.google.common.base.Charsets.UTF_8
-import io.netty.buffer.Unpooled
-import io.netty.channel.embedded.EmbeddedChannel
-
-import org.scalatest.{PrivateMethodTester, FunSuite}
-
-
-class BlockFetchingClientHandlerSuite extends FunSuite with 
PrivateMethodTester {
-
-  test("handling block data (successful fetch)") {
-    val blockId = "test_block"
-    val blockData = "blahblahblahblahblah"
-    val totalLength = 4 + blockId.length + blockData.length
-
-    var parsedBlockId: String = ""
-    var parsedBlockData: String = ""
-    val handler = new BlockFetchingClientHandler
-    handler.addRequest(blockId,
-      new BlockClientListener {
-        override def onFetchFailure(blockId: String, errorMsg: String): Unit = 
???
-        override def onFetchSuccess(bid: String, refCntBuf: 
ReferenceCountedBuffer): Unit = {
-          parsedBlockId = bid
-          val bytes = new Array[Byte](refCntBuf.byteBuffer().remaining)
-          refCntBuf.byteBuffer().get(bytes)
-          parsedBlockData = new String(bytes, UTF_8)
-        }
-      }
-    )
-
-    val outstandingRequests = PrivateMethod[java.util.Map[_, 
_]]('outstandingRequests)
-    assert(handler.invokePrivate(outstandingRequests()).size === 1)
-
-    val channel = new EmbeddedChannel(handler)
-    val buf = ByteBuffer.allocate(totalLength + 4)  // 4 bytes for the length 
field itself
-    buf.putInt(totalLength)
-    buf.putInt(blockId.length)
-    buf.put(blockId.getBytes)
-    buf.put(blockData.getBytes)
-    buf.flip()
-
-    channel.writeInbound(Unpooled.wrappedBuffer(buf))
-    assert(parsedBlockId === blockId)
-    assert(parsedBlockData === blockData)
-
-    assert(handler.invokePrivate(outstandingRequests()).size === 0)
-
-    channel.close()
-  }
-
-  test("handling error message (failed fetch)") {
-    val blockId = "test_block"
-    val errorMsg = "error erro5r error err4or error3 error6 error erro1r"
-    val totalLength = 4 + blockId.length + errorMsg.length
-
-    var parsedBlockId: String = ""
-    var parsedErrorMsg: String = ""
-    val handler = new BlockFetchingClientHandler
-    handler.addRequest(blockId, new BlockClientListener {
-      override def onFetchFailure(bid: String, msg: String) ={
-        parsedBlockId = bid
-        parsedErrorMsg = msg
-      }
-      override def onFetchSuccess(bid: String, refCntBuf: 
ReferenceCountedBuffer) = ???
-    })
-
-    val outstandingRequests = PrivateMethod[java.util.Map[_, 
_]]('outstandingRequests)
-    assert(handler.invokePrivate(outstandingRequests()).size === 1)
-
-    val channel = new EmbeddedChannel(handler)
-    val buf = ByteBuffer.allocate(totalLength + 4)  // 4 bytes for the length 
field itself
-    buf.putInt(totalLength)
-    buf.putInt(-blockId.length)
-    buf.put(blockId.getBytes)
-    buf.put(errorMsg.getBytes)
-    buf.flip()
-
-    channel.writeInbound(Unpooled.wrappedBuffer(buf))
-    assert(parsedBlockId === blockId)
-    assert(parsedErrorMsg === errorMsg)
-
-    assert(handler.invokePrivate(outstandingRequests()).size === 0)
-
-    channel.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala
 
b/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala
deleted file mode 100644
index 3f8d0cf..0000000
--- 
a/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.server
-
-import com.google.common.base.Charsets.UTF_8
-import io.netty.buffer.ByteBuf
-import io.netty.channel.embedded.EmbeddedChannel
-
-import org.scalatest.FunSuite
-
-class BlockHeaderEncoderSuite extends FunSuite {
-
-  test("encode normal block data") {
-    val blockId = "test_block"
-    val channel = new EmbeddedChannel(new BlockHeaderEncoder)
-    channel.writeOutbound(new BlockHeader(17, blockId, None))
-    val out = channel.readOutbound().asInstanceOf[ByteBuf]
-    assert(out.readInt() === 4 + blockId.length + 17)
-    assert(out.readInt() === blockId.length)
-
-    val blockIdBytes = new Array[Byte](blockId.length)
-    out.readBytes(blockIdBytes)
-    assert(new String(blockIdBytes, UTF_8) === blockId)
-    assert(out.readableBytes() === 0)
-
-    channel.close()
-  }
-
-  test("encode error message") {
-    val blockId = "error_block"
-    val errorMsg = "error encountered"
-    val channel = new EmbeddedChannel(new BlockHeaderEncoder)
-    channel.writeOutbound(new BlockHeader(17, blockId, Some(errorMsg)))
-    val out = channel.readOutbound().asInstanceOf[ByteBuf]
-    assert(out.readInt() === 4 + blockId.length + errorMsg.length)
-    assert(out.readInt() === -blockId.length)
-
-    val blockIdBytes = new Array[Byte](blockId.length)
-    out.readBytes(blockIdBytes)
-    assert(new String(blockIdBytes, UTF_8) === blockId)
-
-    val errorMsgBytes = new Array[Byte](errorMsg.length)
-    out.readBytes(errorMsgBytes)
-    assert(new String(errorMsgBytes, UTF_8) === errorMsg)
-    assert(out.readableBytes() === 0)
-
-    channel.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/test/scala/org/apache/spark/network/netty/server/BlockServerHandlerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/network/netty/server/BlockServerHandlerSuite.scala
 
b/core/src/test/scala/org/apache/spark/network/netty/server/BlockServerHandlerSuite.scala
deleted file mode 100644
index 3239c71..0000000
--- 
a/core/src/test/scala/org/apache/spark/network/netty/server/BlockServerHandlerSuite.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.server
-
-import java.io.{RandomAccessFile, File}
-import java.nio.ByteBuffer
-
-import io.netty.buffer.{Unpooled, ByteBuf}
-import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler, 
DefaultFileRegion}
-import io.netty.channel.embedded.EmbeddedChannel
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.storage.{BlockDataProvider, FileSegment}
-
-
-class BlockServerHandlerSuite extends FunSuite {
-
-  test("ByteBuffer block") {
-    val expectedBlockId = "test_bytebuffer_block"
-    val buf = ByteBuffer.allocate(10000)
-    for (i <- 1 to 10000) {
-      buf.put(i.toByte)
-    }
-    buf.flip()
-
-    val channel = new EmbeddedChannel(new BlockServerHandler(new 
BlockDataProvider {
-      override def getBlockData(blockId: String): Either[FileSegment, 
ByteBuffer] = Right(buf)
-    }))
-
-    channel.writeInbound(expectedBlockId)
-    assert(channel.outboundMessages().size === 2)
-
-    val out1 = channel.readOutbound().asInstanceOf[BlockHeader]
-    val out2 = channel.readOutbound().asInstanceOf[ByteBuf]
-
-    assert(out1.blockId === expectedBlockId)
-    assert(out1.blockSize === buf.remaining)
-    assert(out1.error === None)
-
-    assert(out2.equals(Unpooled.wrappedBuffer(buf)))
-
-    channel.close()
-  }
-
-  test("FileSegment block via zero-copy") {
-    val expectedBlockId = "test_file_block"
-
-    // Create random file data
-    val fileContent = new Array[Byte](1024)
-    scala.util.Random.nextBytes(fileContent)
-    val testFile = File.createTempFile("netty-test-file", "txt")
-    val fp = new RandomAccessFile(testFile, "rw")
-    fp.write(fileContent)
-    fp.close()
-
-    val channel = new EmbeddedChannel(new BlockServerHandler(new 
BlockDataProvider {
-      override def getBlockData(blockId: String): Either[FileSegment, 
ByteBuffer] = {
-        Left(new FileSegment(testFile, 15, testFile.length - 25))
-      }
-    }))
-
-    channel.writeInbound(expectedBlockId)
-    assert(channel.outboundMessages().size === 2)
-
-    val out1 = channel.readOutbound().asInstanceOf[BlockHeader]
-    val out2 = channel.readOutbound().asInstanceOf[DefaultFileRegion]
-
-    assert(out1.blockId === expectedBlockId)
-    assert(out1.blockSize === testFile.length - 25)
-    assert(out1.error === None)
-
-    assert(out2.count === testFile.length - 25)
-    assert(out2.position === 15)
-  }
-
-  test("pipeline exception propagation") {
-    val blockServerHandler = new BlockServerHandler(new BlockDataProvider {
-      override def getBlockData(blockId: String): Either[FileSegment, 
ByteBuffer] = ???
-    })
-    val exceptionHandler = new SimpleChannelInboundHandler[String]() {
-      override def channelRead0(ctx: ChannelHandlerContext, msg: String): Unit 
= {
-        throw new Exception("this is an error")
-      }
-    }
-
-    val channel = new EmbeddedChannel(exceptionHandler, blockServerHandler)
-    assert(channel.isOpen)
-    channel.writeInbound("a message to trigger the error")
-    assert(!channel.isOpen)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala 
b/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala
new file mode 100644
index 0000000..0ade1ba
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{EOFException, OutputStream, InputStream}
+import java.nio.ByteBuffer
+
+import scala.reflect.ClassTag
+
+
+/**
+ * A serializer implementation that always return a single element in a 
deserialization stream.
+ */
+class TestSerializer extends Serializer {
+  override def newInstance() = new TestSerializerInstance
+}
+
+
+class TestSerializerInstance extends SerializerInstance {
+  override def serialize[T: ClassTag](t: T): ByteBuffer = ???
+
+  override def serializeStream(s: OutputStream): SerializationStream = ???
+
+  override def deserializeStream(s: InputStream) = new 
TestDeserializationStream
+
+  override def deserialize[T: ClassTag](bytes: ByteBuffer): T = ???
+
+  override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: 
ClassLoader): T = ???
+}
+
+
+class TestDeserializationStream extends DeserializationStream {
+
+  private var count = 0
+
+  override def readObject[T: ClassTag](): T = {
+    count += 1
+    if (count == 2) {
+      throw new EOFException
+    }
+    new Object().asInstanceOf[T]
+  }
+
+  override def close(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
index ba47fe5..6790388 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
@@ -25,7 +25,7 @@ import org.scalatest.FunSuite
 
 import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf}
 import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.network.{FileSegmentManagedBuffer, ManagedBuffer}
+import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.shuffle.FileShuffleBlockManager
 import org.apache.spark.storage.{ShuffleBlockId, FileSegment}
@@ -36,9 +36,9 @@ class HashShuffleManagerSuite extends FunSuite with 
LocalSparkContext {
   private def checkSegments(expected: FileSegment, buffer: ManagedBuffer) {
     assert(buffer.isInstanceOf[FileSegmentManagedBuffer])
     val segment = buffer.asInstanceOf[FileSegmentManagedBuffer]
-    assert(expected.file.getCanonicalPath === segment.file.getCanonicalPath)
-    assert(expected.offset === segment.offset)
-    assert(expected.length === segment.length)
+    assert(expected.file.getCanonicalPath === segment.getFile.getCanonicalPath)
+    assert(expected.offset === segment.getOffset)
+    assert(expected.length === segment.getLength)
   }
 
   test("consolidated shuffle can write to shuffle group without messing 
existing offsets/lengths") {

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index a8c049d..4e502cf 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.storage
 
+import java.util.concurrent.Semaphore
+
+import scala.concurrent.future
+import scala.concurrent.ExecutionContext.Implicits.global
 import org.apache.spark.{TaskContextImpl, TaskContext}
 import org.apache.spark.network.{BlockFetchingListener, BlockTransferService}
 
@@ -27,38 +31,64 @@ import org.mockito.stubbing.Answer
 
 import org.scalatest.FunSuite
 
+import org.apache.spark.{SparkConf, TaskContext}
+import org.apache.spark.network._
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.serializer.TestSerializer
+
 
 class ShuffleBlockFetcherIteratorSuite extends FunSuite {
+  // Some of the tests are quite tricky because we are testing the cleanup 
behavior
+  // in the presence of faults.
 
-  test("handle local read failures in BlockManager") {
+  /** Creates a mock [[BlockTransferService]] that returns data from the given 
map. */
+  private def createMockTransfer(data: Map[BlockId, ManagedBuffer]): 
BlockTransferService = {
     val transfer = mock(classOf[BlockTransferService])
-    val blockManager = mock(classOf[BlockManager])
-    doReturn(BlockManagerId("test-client", "test-client", 
1)).when(blockManager).blockManagerId
-
-    val blIds = Array[BlockId](
-      ShuffleBlockId(0,0,0),
-      ShuffleBlockId(0,1,0),
-      ShuffleBlockId(0,2,0),
-      ShuffleBlockId(0,3,0),
-      ShuffleBlockId(0,4,0))
-
-    val optItr = mock(classOf[Option[Iterator[Any]]])
-    val answer = new Answer[Option[Iterator[Any]]] {
-      override def answer(invocation: InvocationOnMock) = 
Option[Iterator[Any]] {
-        throw new Exception
+    when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new 
Answer[Unit] {
+      override def answer(invocation: InvocationOnMock): Unit = {
+        val blocks = invocation.getArguments()(2).asInstanceOf[Seq[String]]
+        val listener = 
invocation.getArguments()(3).asInstanceOf[BlockFetchingListener]
+
+        for (blockId <- blocks) {
+          if (data.contains(BlockId(blockId))) {
+            listener.onBlockFetchSuccess(blockId, data(BlockId(blockId)))
+          } else {
+            listener.onBlockFetchFailure(blockId, new 
BlockNotFoundException(blockId))
+          }
+        }
       }
+    })
+    transfer
+  }
+
+  private val conf = new SparkConf
+
+  test("successful 3 local reads + 2 remote reads") {
+    val blockManager = mock(classOf[BlockManager])
+    val localBmId = BlockManagerId("test-client", "test-client", 1)
+    doReturn(localBmId).when(blockManager).blockManagerId
+
+    // Make sure blockManager.getBlockData would return the blocks
+    val localBlocks = Map[BlockId, ManagedBuffer](
+      ShuffleBlockId(0, 0, 0) -> mock(classOf[ManagedBuffer]),
+      ShuffleBlockId(0, 1, 0) -> mock(classOf[ManagedBuffer]),
+      ShuffleBlockId(0, 2, 0) -> mock(classOf[ManagedBuffer]))
+    localBlocks.foreach { case (blockId, buf) =>
+      doReturn(buf).when(blockManager).getBlockData(meq(blockId))
     }
 
-    // 3rd block is going to fail
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), 
any())
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), 
any())
-    doAnswer(answer).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), 
any())
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), 
any())
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), 
any())
+    // Make sure remote blocks would return
+    val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+    val remoteBlocks = Map[BlockId, ManagedBuffer](
+      ShuffleBlockId(0, 3, 0) -> mock(classOf[ManagedBuffer]),
+      ShuffleBlockId(0, 4, 0) -> mock(classOf[ManagedBuffer])
+    )
+
+    val transfer = createMockTransfer(remoteBlocks)
 
-    val bmId = BlockManagerId("test-client", "test-client", 1)
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
-      (bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq)
+      (localBmId, localBlocks.keys.map(blockId => (blockId, 
1.asInstanceOf[Long])).toSeq),
+      (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 
1.asInstanceOf[Long])).toSeq)
     )
 
     val iterator = new ShuffleBlockFetcherIterator(
@@ -66,118 +96,145 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
       transfer,
       blockManager,
       blocksByAddress,
-      null,
+      new TestSerializer,
       48 * 1024 * 1024)
 
-    // Without exhausting the iterator, the iterator should be lazy and not 
call
-    // getLocalShuffleFromDisk.
-    verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any())
-
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
no elements")
-    // the 2nd element of the tuple returned by iterator.next should be 
defined when
-    // fetching successfully
-    assert(iterator.next()._2.isDefined,
-      "1st element should be defined but is not actually defined")
-    verify(blockManager, times(1)).getLocalShuffleFromDisk(any(), any())
-
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
1 element")
-    assert(iterator.next()._2.isDefined,
-      "2nd element should be defined but is not actually defined")
-    verify(blockManager, times(2)).getLocalShuffleFromDisk(any(), any())
-
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
2 elements")
-    // 3rd fetch should be failed
-    intercept[Exception] {
-      iterator.next()
+    // 3 local blocks fetched in initialization
+    verify(blockManager, times(3)).getBlockData(any())
+
+    for (i <- 0 until 5) {
+      assert(iterator.hasNext, s"iterator should have 5 elements but actually 
has $i elements")
+      val (blockId, subIterator) = iterator.next()
+      assert(subIterator.isDefined,
+        s"iterator should have 5 elements defined but actually has $i 
elements")
+
+      // Make sure we release the buffer once the iterator is exhausted.
+      val mockBuf = localBlocks.getOrElse(blockId, remoteBlocks(blockId))
+      verify(mockBuf, times(0)).release()
+      subIterator.get.foreach(_ => Unit)  // exhaust the iterator
+      verify(mockBuf, times(1)).release()
     }
-    verify(blockManager, times(3)).getLocalShuffleFromDisk(any(), any())
+
+    // 3 local blocks, and 2 remote blocks
+    // (but from the same block manager so one call to fetchBlocks)
+    verify(blockManager, times(3)).getBlockData(any())
+    verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any())
   }
 
-  test("handle local read successes") {
-    val transfer = mock(classOf[BlockTransferService])
+  test("release current unexhausted buffer in case the task completes early") {
     val blockManager = mock(classOf[BlockManager])
-    doReturn(BlockManagerId("test-client", "test-client", 
1)).when(blockManager).blockManagerId
-
-    val blIds = Array[BlockId](
-      ShuffleBlockId(0,0,0),
-      ShuffleBlockId(0,1,0),
-      ShuffleBlockId(0,2,0),
-      ShuffleBlockId(0,3,0),
-      ShuffleBlockId(0,4,0))
+    val localBmId = BlockManagerId("test-client", "test-client", 1)
+    doReturn(localBmId).when(blockManager).blockManagerId
+
+    // Make sure remote blocks would return
+    val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+    val blocks = Map[BlockId, ManagedBuffer](
+      ShuffleBlockId(0, 0, 0) -> mock(classOf[ManagedBuffer]),
+      ShuffleBlockId(0, 1, 0) -> mock(classOf[ManagedBuffer]),
+      ShuffleBlockId(0, 2, 0) -> mock(classOf[ManagedBuffer])
+    )
 
-    val optItr = mock(classOf[Option[Iterator[Any]]])
+    // Semaphore to coordinate event sequence in two different threads.
+    val sem = new Semaphore(0)
 
-    // All blocks should be fetched successfully
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), 
any())
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), 
any())
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), 
any())
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), 
any())
-    doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), 
any())
+    val transfer = mock(classOf[BlockTransferService])
+    when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new 
Answer[Unit] {
+      override def answer(invocation: InvocationOnMock): Unit = {
+        val listener = 
invocation.getArguments()(3).asInstanceOf[BlockFetchingListener]
+        future {
+          // Return the first two blocks, and wait till task completion before 
returning the 3rd one
+          listener.onBlockFetchSuccess(
+            ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
+          listener.onBlockFetchSuccess(
+            ShuffleBlockId(0, 1, 0).toString, blocks(ShuffleBlockId(0, 1, 0)))
+          sem.acquire()
+          listener.onBlockFetchSuccess(
+            ShuffleBlockId(0, 2, 0).toString, blocks(ShuffleBlockId(0, 2, 0)))
+        }
+      }
+    })
 
-    val bmId = BlockManagerId("test-client", "test-client", 1)
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
-      (bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq)
-    )
+      (remoteBmId, blocks.keys.map(blockId => (blockId, 
1.asInstanceOf[Long])).toSeq))
 
+    val taskContext = new TaskContextImpl(0, 0, 0)
     val iterator = new ShuffleBlockFetcherIterator(
-      new TaskContextImpl(0, 0, 0),
+      taskContext,
       transfer,
       blockManager,
       blocksByAddress,
-      null,
+      new TestSerializer,
       48 * 1024 * 1024)
 
-    // Without exhausting the iterator, the iterator should be lazy and not 
call getLocalShuffleFromDisk.
-    verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any())
-
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
no elements")
-    assert(iterator.next()._2.isDefined,
-      "All elements should be defined but 1st element is not actually defined")
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
1 element")
-    assert(iterator.next()._2.isDefined,
-      "All elements should be defined but 2nd element is not actually defined")
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
2 elements")
-    assert(iterator.next()._2.isDefined,
-      "All elements should be defined but 3rd element is not actually defined")
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
3 elements")
-    assert(iterator.next()._2.isDefined,
-      "All elements should be defined but 4th element is not actually defined")
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 
4 elements")
-    assert(iterator.next()._2.isDefined,
-      "All elements should be defined but 5th element is not actually defined")
-
-    verify(blockManager, times(5)).getLocalShuffleFromDisk(any(), any())
+    // Exhaust the first block, and then it should be released.
+    iterator.next()._2.get.foreach(_ => Unit)
+    verify(blocks(ShuffleBlockId(0, 0, 0)), times(1)).release()
+
+    // Get the 2nd block but do not exhaust the iterator
+    val subIter = iterator.next()._2.get
+
+    // Complete the task; then the 2nd block buffer should be exhausted
+    verify(blocks(ShuffleBlockId(0, 1, 0)), times(0)).release()
+    taskContext.markTaskCompleted()
+    verify(blocks(ShuffleBlockId(0, 1, 0)), times(1)).release()
+
+    // The 3rd block should not be retained because the iterator is already in 
zombie state
+    sem.release()
+    verify(blocks(ShuffleBlockId(0, 2, 0)), times(0)).retain()
+    verify(blocks(ShuffleBlockId(0, 2, 0)), times(0)).release()
   }
 
-  test("handle remote fetch failures in BlockTransferService") {
+  test("fail all blocks if any of the remote request fails") {
+    val blockManager = mock(classOf[BlockManager])
+    val localBmId = BlockManagerId("test-client", "test-client", 1)
+    doReturn(localBmId).when(blockManager).blockManagerId
+
+    // Make sure remote blocks would return
+    val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+    val blocks = Map[BlockId, ManagedBuffer](
+      ShuffleBlockId(0, 0, 0) -> mock(classOf[ManagedBuffer]),
+      ShuffleBlockId(0, 1, 0) -> mock(classOf[ManagedBuffer]),
+      ShuffleBlockId(0, 2, 0) -> mock(classOf[ManagedBuffer])
+    )
+
+    // Semaphore to coordinate event sequence in two different threads.
+    val sem = new Semaphore(0)
+
     val transfer = mock(classOf[BlockTransferService])
     when(transfer.fetchBlocks(any(), any(), any(), any())).thenAnswer(new 
Answer[Unit] {
       override def answer(invocation: InvocationOnMock): Unit = {
         val listener = 
invocation.getArguments()(3).asInstanceOf[BlockFetchingListener]
-        listener.onBlockFetchFailure(new Exception("blah"))
+        future {
+          // Return the first block, and then fail.
+          listener.onBlockFetchSuccess(
+            ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
+          listener.onBlockFetchFailure(
+            ShuffleBlockId(0, 1, 0).toString, new 
BlockNotFoundException("blah"))
+          listener.onBlockFetchFailure(
+            ShuffleBlockId(0, 2, 0).toString, new 
BlockNotFoundException("blah"))
+          sem.release()
+        }
       }
     })
 
-    val blockManager = mock(classOf[BlockManager])
-
-    when(blockManager.blockManagerId).thenReturn(BlockManagerId("test-client", 
"test-client", 1))
-
-    val blId1 = ShuffleBlockId(0, 0, 0)
-    val blId2 = ShuffleBlockId(0, 1, 0)
-    val bmId = BlockManagerId("test-server", "test-server", 1)
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
-      (bmId, Seq((blId1, 1L), (blId2, 1L))))
+      (remoteBmId, blocks.keys.map(blockId => (blockId, 
1.asInstanceOf[Long])).toSeq))
 
+    val taskContext = new TaskContextImpl(0, 0, 0)
     val iterator = new ShuffleBlockFetcherIterator(
-      new TaskContextImpl(0, 0, 0),
+      taskContext,
       transfer,
       blockManager,
       blocksByAddress,
-      null,
+      new TestSerializer,
       48 * 1024 * 1024)
 
-    iterator.foreach { case (_, iterOption) =>
-      assert(!iterOption.isDefined)
-    }
+    // Continue only after the mock calls onBlockFetchFailure
+    sem.acquire()
+
+    // The first block should be defined, and the last two are not defined 
(due to failure)
+    assert(iterator.next()._2.isDefined === true)
+    assert(iterator.next()._2.isDefined === false)
+    assert(iterator.next()._2.isDefined === false)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/network/common/pom.xml
----------------------------------------------------------------------
diff --git a/network/common/pom.xml b/network/common/pom.xml
new file mode 100644
index 0000000..e3b7e32
--- /dev/null
+++ b/network/common/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent</artifactId>
+    <version>1.2.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>network</artifactId>
+  <packaging>jar</packaging>
+  <name>Shuffle Streaming Service</name>
+  <url>http://spark.apache.org/</url>
+  <properties>
+    <sbt.project.name>network</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <!-- Core dependencies -->
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <!-- Provided dependencies -->
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+
+  <build>
+    <outputDirectory>target/java/classes</outputDirectory>
+    <testOutputDirectory>target/java/test-classes</testOutputDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.17</version>
+        <configuration>
+          <skipTests>false</skipTests>
+          <includes>
+            <include>**/Test*.java</include>
+            <include>**/*Test.java</include>
+            <include>**/*Suite.java</include>
+          </includes>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/network/common/src/main/java/org/apache/spark/network/TransportContext.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/TransportContext.java 
b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
new file mode 100644
index 0000000..854aa66
--- /dev/null
+++ 
b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network;
+
+import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.client.TransportClientFactory;
+import org.apache.spark.network.client.TransportResponseHandler;
+import org.apache.spark.network.protocol.MessageDecoder;
+import org.apache.spark.network.protocol.MessageEncoder;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportChannelHandler;
+import org.apache.spark.network.server.TransportRequestHandler;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.server.StreamManager;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * Contains the context to create a {@link TransportServer}, {@link 
TransportClientFactory}, and to
+ * setup Netty Channel pipelines with a {@link 
org.apache.spark.network.server.TransportChannelHandler}.
+ *
+ * There are two communication protocols that the TransportClient provides, 
control-plane RPCs and
+ * data-plane "chunk fetching". The handling of the RPCs is performed outside 
of the scope of the
+ * TransportContext (i.e., by a user-provided handler), and it is responsible 
for setting up streams
+ * which can be streamed through the data plane in chunks using zero-copy IO.
+ *
+ * The TransportServer and TransportClientFactory both create a 
TransportChannelHandler for each
+ * channel. As each TransportChannelHandler contains a TransportClient, this 
enables server
+ * processes to send messages back to the client on an existing channel.
+ */
+public class TransportContext {
+  private final Logger logger = 
LoggerFactory.getLogger(TransportContext.class);
+
+  private final TransportConf conf;
+  private final StreamManager streamManager;
+  private final RpcHandler rpcHandler;
+
+  private final MessageEncoder encoder;
+  private final MessageDecoder decoder;
+
+  public TransportContext(TransportConf conf, StreamManager streamManager, 
RpcHandler rpcHandler) {
+    this.conf = conf;
+    this.streamManager = streamManager;
+    this.rpcHandler = rpcHandler;
+    this.encoder = new MessageEncoder();
+    this.decoder = new MessageDecoder();
+  }
+
+  public TransportClientFactory createClientFactory() {
+    return new TransportClientFactory(this);
+  }
+
+  public TransportServer createServer() {
+    return new TransportServer(this);
+  }
+
+  /**
+   * Initializes a client or server Netty Channel Pipeline which 
encodes/decodes messages and
+   * has a {@link org.apache.spark.network.server.TransportChannelHandler} to 
handle request or
+   * response messages.
+   *
+   * @return Returns the created TransportChannelHandler, which includes a 
TransportClient that can
+   * be used to communicate on this channel. The TransportClient is directly 
associated with a
+   * ChannelHandler to ensure all users of the same channel get the same 
TransportClient object.
+   */
+  public TransportChannelHandler initializePipeline(SocketChannel channel) {
+    try {
+      TransportChannelHandler channelHandler = createChannelHandler(channel);
+      channel.pipeline()
+        .addLast("encoder", encoder)
+        .addLast("frameDecoder", NettyUtils.createFrameDecoder())
+        .addLast("decoder", decoder)
+        // NOTE: Chunks are currently guaranteed to be returned in the order 
of request, but this
+        // would require more logic to guarantee if this were not part of the 
same event loop.
+        .addLast("handler", channelHandler);
+      return channelHandler;
+    } catch (RuntimeException e) {
+      logger.error("Error while initializing Netty pipeline", e);
+      throw e;
+    }
+  }
+
+  /**
+   * Creates the server- and client-side handler which is used to handle both 
RequestMessages and
+   * ResponseMessages. The channel is expected to have been successfully 
created, though certain
+   * properties (such as the remoteAddress()) may not be available yet.
+   */
+  private TransportChannelHandler createChannelHandler(Channel channel) {
+    TransportResponseHandler responseHandler = new 
TransportResponseHandler(channel);
+    TransportClient client = new TransportClient(channel, responseHandler);
+    TransportRequestHandler requestHandler = new 
TransportRequestHandler(channel, client,
+      streamManager, rpcHandler);
+    return new TransportChannelHandler(client, responseHandler, 
requestHandler);
+  }
+
+  public TransportConf getConf() { return conf; }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 
b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
new file mode 100644
index 0000000..89ed79b
--- /dev/null
+++ 
b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import com.google.common.base.Objects;
+import com.google.common.io.ByteStreams;
+import io.netty.channel.DefaultFileRegion;
+
+import org.apache.spark.network.util.JavaUtils;
+
+/**
+ * A {@link ManagedBuffer} backed by a segment in a file.
+ */
+public final class FileSegmentManagedBuffer extends ManagedBuffer {
+
+  /**
+   * Memory mapping is expensive and can destabilize the JVM (SPARK-1145, 
SPARK-3889).
+   * Avoid unless there's a good reason not to.
+   */
+  // TODO: Make this configurable
+  private static final long MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;
+
+  private final File file;
+  private final long offset;
+  private final long length;
+
+  public FileSegmentManagedBuffer(File file, long offset, long length) {
+    this.file = file;
+    this.offset = offset;
+    this.length = length;
+  }
+
+  @Override
+  public long size() {
+    return length;
+  }
+
+  @Override
+  public ByteBuffer nioByteBuffer() throws IOException {
+    FileChannel channel = null;
+    try {
+      channel = new RandomAccessFile(file, "r").getChannel();
+      // Just copy the buffer if it's sufficiently small, as memory mapping 
has a high overhead.
+      if (length < MIN_MEMORY_MAP_BYTES) {
+        ByteBuffer buf = ByteBuffer.allocate((int) length);
+        channel.position(offset);
+        while (buf.remaining() != 0) {
+          if (channel.read(buf) == -1) {
+            throw new IOException(String.format("Reached EOF before filling 
buffer\n" +
+              "offset=%s\nfile=%s\nbuf.remaining=%s",
+              offset, file.getAbsoluteFile(), buf.remaining()));
+          }
+        }
+        buf.flip();
+        return buf;
+      } else {
+        return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
+      }
+    } catch (IOException e) {
+      try {
+        if (channel != null) {
+          long size = channel.size();
+          throw new IOException("Error in reading " + this + " (actual file 
length " + size + ")",
+            e);
+        }
+      } catch (IOException ignored) {
+        // ignore
+      }
+      throw new IOException("Error in opening " + this, e);
+    } finally {
+      JavaUtils.closeQuietly(channel);
+    }
+  }
+
+  @Override
+  public InputStream createInputStream() throws IOException {
+    FileInputStream is = null;
+    try {
+      is = new FileInputStream(file);
+      ByteStreams.skipFully(is, offset);
+      return ByteStreams.limit(is, length);
+    } catch (IOException e) {
+      try {
+        if (is != null) {
+          long size = file.length();
+          throw new IOException("Error in reading " + this + " (actual file 
length " + size + ")",
+              e);
+        }
+      } catch (IOException ignored) {
+        // ignore
+      } finally {
+        JavaUtils.closeQuietly(is);
+      }
+      throw new IOException("Error in opening " + this, e);
+    } catch (RuntimeException e) {
+      JavaUtils.closeQuietly(is);
+      throw e;
+    }
+  }
+
+  @Override
+  public ManagedBuffer retain() {
+    return this;
+  }
+
+  @Override
+  public ManagedBuffer release() {
+    return this;
+  }
+
+  @Override
+  public Object convertToNetty() throws IOException {
+    FileChannel fileChannel = new FileInputStream(file).getChannel();
+    return new DefaultFileRegion(fileChannel, offset, length);
+  }
+
+  public File getFile() { return file; }
+
+  public long getOffset() { return offset; }
+
+  public long getLength() { return length; }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("file", file)
+      .add("offset", offset)
+      .add("length", length)
+      .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
 
b/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
new file mode 100644
index 0000000..a415db5
--- /dev/null
+++ 
b/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * This interface provides an immutable view for data in the form of bytes. 
The implementation
+ * should specify how the data is provided:
+ *
+ * - {@link FileSegmentManagedBuffer}: data backed by part of a file
+ * - {@link NioManagedBuffer}: data backed by a NIO ByteBuffer
+ * - {@link NettyManagedBuffer}: data backed by a Netty ByteBuf
+ *
+ * The concrete buffer implementation might be managed outside the JVM garbage 
collector.
+ * For example, in the case of {@link NettyManagedBuffer}, the buffers are 
reference counted.
+ * In that case, if the buffer is going to be passed around to a different 
thread, retain/release
+ * should be called.
+ */
+public abstract class ManagedBuffer {
+
+  /** Number of bytes of the data. */
+  public abstract long size();
+
+  /**
+   * Exposes this buffer's data as an NIO ByteBuffer. Changing the position 
and limit of the
+   * returned ByteBuffer should not affect the content of this buffer.
+   */
+  // TODO: Deprecate this, usage may require expensive memory mapping or 
allocation.
+  public abstract ByteBuffer nioByteBuffer() throws IOException;
+
+  /**
+   * Exposes this buffer's data as an InputStream. The underlying 
implementation does not
+   * necessarily check for the length of bytes read, so the caller is 
responsible for making sure
+   * it does not go over the limit.
+   */
+  public abstract InputStream createInputStream() throws IOException;
+
+  /**
+   * Increment the reference count by one if applicable.
+   */
+  public abstract ManagedBuffer retain();
+
+  /**
+   * If applicable, decrement the reference count by one and deallocates the 
buffer if the
+   * reference count reaches zero.
+   */
+  public abstract ManagedBuffer release();
+
+  /**
+   * Convert the buffer into an Netty object, used to write the data out.
+   */
+  public abstract Object convertToNetty() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/network/common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
 
b/network/common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
new file mode 100644
index 0000000..c806bfa
--- /dev/null
+++ 
b/network/common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+
+/**
+ * A {@link ManagedBuffer} backed by a Netty {@link ByteBuf}.
+ */
+public final class NettyManagedBuffer extends ManagedBuffer {
+  private final ByteBuf buf;
+
+  public NettyManagedBuffer(ByteBuf buf) {
+    this.buf = buf;
+  }
+
+  @Override
+  public long size() {
+    return buf.readableBytes();
+  }
+
+  @Override
+  public ByteBuffer nioByteBuffer() throws IOException {
+    return buf.nioBuffer();
+  }
+
+  @Override
+  public InputStream createInputStream() throws IOException {
+    return new ByteBufInputStream(buf);
+  }
+
+  @Override
+  public ManagedBuffer retain() {
+    buf.retain();
+    return this;
+  }
+
+  @Override
+  public ManagedBuffer release() {
+    buf.release();
+    return this;
+  }
+
+  @Override
+  public Object convertToNetty() throws IOException {
+    return buf.duplicate();
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("buf", buf)
+      .toString();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to