Repository: spark
Updated Branches:
  refs/heads/master bd7b91cef -> e41acb757


[SPARK-13992] Add support for off-heap caching

This patch adds support for caching blocks in the executor processes using 
direct / off-heap memory.

## User-facing changes

**Updated semantics of `OFF_HEAP` storage level**: In Spark 1.x, the `OFF_HEAP` 
storage level indicated that an RDD should be cached in Tachyon. Spark 2.x 
removed the external block store API that Tachyon caching was based on (see 
#10752 / SPARK-12667), so `OFF_HEAP` became an alias for `MEMORY_ONLY_SER`. As 
of this patch, `OFF_HEAP` means "serialized and cached in off-heap memory or on 
disk". Via the `StorageLevel` constructor, `useOffHeap` can be set if 
`serialized == true` and can be used to construct custom storage levels which 
support replication.

**Storage UI reporting**: the storage UI will now report whether in-memory 
blocks are stored on- or off-heap.

**Only supported by UnifiedMemoryManager**: for simplicity, this feature is 
only supported when the default UnifiedMemoryManager is used; applications 
which use the legacy memory manager (`spark.memory.useLegacyMode=true`) are not 
currently able to allocate off-heap storage memory, so using off-heap caching 
will fail with an error when legacy memory management is enabled. Given that we 
plan to eventually remove the legacy memory manager, this is not a significant 
restriction.

**Memory management policies:** the policies for dividing available memory 
between execution and storage are the same for both on- and off-heap memory. 
For off-heap memory, the total amount of memory available for use by Spark is 
controlled by `spark.memory.offHeap.size`, which is an absolute size. Off-heap 
storage memory obeys `spark.memory.storageFraction` in order to control the 
amount of unevictable storage memory. For example, if 
`spark.memory.offHeap.size` is 1 gigabyte and Spark uses the default 
`storageFraction` of 0.5, then up to 500 megabytes of off-heap cached blocks 
will be protected from eviction due to execution memory pressure. If necessary, 
we can split `spark.memory.storageFraction` into separate on- and off-heap 
configurations, but this doesn't seem necessary now and can be done later 
without any breaking changes.

**Use of off-heap memory does not imply use of off-heap execution (or 
vice-versa)**: for now, the settings controlling the use of off-heap execution 
memory (`spark.memory.offHeap.enabled`) and off-heap caching are completely 
independent, so Spark SQL can be configured to use off-heap memory for 
execution while continuing to cache blocks on-heap. If desired, we can change 
this in a followup patch so that `spark.memory.offHeap.enabled` affect the 
default storage level for cached SQL tables.

## Internal changes

- Rename `ByteArrayChunkOutputStream` to `ChunkedByteBufferOutputStream`
  - It now returns a `ChunkedByteBuffer` instead of an array of byte arrays.
  - Its constructor now accept an `allocator` function which is called to 
allocate `ByteBuffer`s. This allows us to control whether it allocates regular 
ByteBuffers or off-heap DirectByteBuffers.
  - Because block serialization is now performed during the unroll process, a 
`ChunkedByteBufferOutputStream` which is configured with a `DirectByteBuffer` 
allocator will use off-heap memory for both unroll and storage memory.
- The `MemoryStore`'s MemoryEntries now tracks whether blocks are stored on- or 
off-heap.
  - `evictBlocksToFreeSpace()` now accepts a `MemoryMode` parameter so that we 
don't try to evict off-heap blocks in response to on-heap memory pressure (or 
vice-versa).
- Make sure that off-heap buffers are properly de-allocated during MemoryStore 
eviction.
- The JVM limits the total size of allocated direct byte buffers using the 
`-XX:MaxDirectMemorySize` flag and the default tends to be fairly low (< 512 
megabytes in some JVMs). To work around this limitation, this patch adds a 
custom DirectByteBuffer allocator which ignores this memory limit.

Author: Josh Rosen <[email protected]>

Closes #11805 from JoshRosen/off-heap-caching.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e41acb75
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e41acb75
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e41acb75

Branch: refs/heads/master
Commit: e41acb757327e3226ffe312766ec759c16616588
Parents: bd7b91c
Author: Josh Rosen <[email protected]>
Authored: Fri Apr 1 14:34:59 2016 -0700
Committer: Josh Rosen <[email protected]>
Committed: Fri Apr 1 14:34:59 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/spark/unsafe/Platform.java  |  32 ++++
 .../spark/broadcast/TorrentBroadcast.scala      |   8 +-
 .../apache/spark/memory/StorageMemoryPool.scala |  22 ++-
 .../scala/org/apache/spark/scheduler/Task.scala |   5 +-
 .../spark/serializer/SerializerManager.scala    |  16 +-
 .../org/apache/spark/storage/BlockManager.scala |  70 ++++++---
 .../storage/BlockManagerMasterEndpoint.scala    |   2 +-
 .../org/apache/spark/storage/StorageLevel.scala |  21 ++-
 .../spark/storage/memory/MemoryStore.scala      | 153 +++++++++++++------
 .../util/io/ByteArrayChunkOutputStream.scala    |  99 ------------
 .../spark/util/io/ChunkedByteBuffer.scala       |  14 +-
 .../util/io/ChunkedByteBufferOutputStream.scala | 113 ++++++++++++++
 .../org/apache/spark/DistributedSuite.scala     |   4 +-
 .../spark/io/ChunkedByteBufferSuite.scala       |   2 +-
 .../spark/memory/MemoryManagerSuite.scala       |   3 +-
 .../storage/BlockManagerReplicationSuite.scala  |  24 ++-
 .../spark/storage/BlockManagerSuite.scala       |  32 +++-
 .../apache/spark/storage/MemoryStoreSuite.scala |  22 +--
 .../io/ByteArrayChunkOutputStreamSuite.scala    | 109 -------------
 .../io/ChunkedByteBufferOutputStreamSuite.scala | 114 ++++++++++++++
 .../rdd/WriteAheadLogBackedBlockRDD.scala       |   3 +-
 .../streaming/ReceivedBlockHandlerSuite.scala   |   3 +-
 22 files changed, 520 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 672552c..bdf52f3 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -17,9 +17,12 @@
 
 package org.apache.spark.unsafe;
 
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
 
+import sun.misc.Cleaner;
 import sun.misc.Unsafe;
 
 public final class Platform {
@@ -144,6 +147,35 @@ public final class Platform {
     return newMemory;
   }
 
+  /**
+   * Uses internal JDK APIs to allocate a DirectByteBuffer while ignoring the 
JVM's
+   * MaxDirectMemorySize limit (the default limit is too low and we do not 
want to require users
+   * to increase it).
+   */
+  @SuppressWarnings("unchecked")
+  public static ByteBuffer allocateDirectBuffer(int size) {
+    try {
+      Class cls = Class.forName("java.nio.DirectByteBuffer");
+      Constructor constructor = cls.getDeclaredConstructor(Long.TYPE, 
Integer.TYPE);
+      constructor.setAccessible(true);
+      Field cleanerField = cls.getDeclaredField("cleaner");
+      cleanerField.setAccessible(true);
+      final long memory = allocateMemory(size);
+      ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
+      Cleaner cleaner = Cleaner.create(buffer, new Runnable() {
+        @Override
+        public void run() {
+          freeMemory(memory);
+        }
+      });
+      cleanerField.set(buffer, cleaner);
+      return buffer;
+    } catch (Exception e) {
+      throwException(e);
+    }
+    throw new IllegalStateException("unreachable");
+  }
+
   public static void setMemory(long address, byte value, long size) {
     _UNSAFE.setMemory(address, size, value);
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index e5e6a9e..632b0ae 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -30,7 +30,7 @@ import org.apache.spark.io.CompressionCodec
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
 import org.apache.spark.util.{ByteBufferInputStream, Utils}
-import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
+import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
 
 /**
  * A BitTorrent-like implementation of 
[[org.apache.spark.broadcast.Broadcast]].
@@ -228,12 +228,12 @@ private object TorrentBroadcast extends Logging {
       blockSize: Int,
       serializer: Serializer,
       compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = {
-    val bos = new ByteArrayChunkOutputStream(blockSize)
-    val out: OutputStream = compressionCodec.map(c => 
c.compressedOutputStream(bos)).getOrElse(bos)
+    val cbbos = new ChunkedByteBufferOutputStream(blockSize, 
ByteBuffer.allocate)
+    val out = compressionCodec.map(c => 
c.compressedOutputStream(cbbos)).getOrElse(cbbos)
     val ser = serializer.newInstance()
     val serOut = ser.serializeStream(out)
     serOut.writeObject[T](obj).close()
-    bos.toArrays.map(ByteBuffer.wrap)
+    cbbos.toChunkedByteBuffer.getChunks()
   }
 
   def unBlockifyObject[T: ClassTag](

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala 
b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index a67e8da..0b552ca 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -35,6 +35,11 @@ private[memory] class StorageMemoryPool(
     memoryMode: MemoryMode
   ) extends MemoryPool(lock) with Logging {
 
+  private[this] val poolName: String = memoryMode match {
+    case MemoryMode.ON_HEAP => "on-heap storage"
+    case MemoryMode.OFF_HEAP => "off-heap storage"
+  }
+
   @GuardedBy("lock")
   private[this] var _memoryUsed: Long = 0L
 
@@ -60,7 +65,7 @@ private[memory] class StorageMemoryPool(
 
   /**
    * Acquire N bytes of memory to cache the given block, evicting existing 
ones if necessary.
- *
+   *
    * @return whether all N bytes were successfully granted.
    */
   def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = 
lock.synchronized {
@@ -83,9 +88,8 @@ private[memory] class StorageMemoryPool(
     assert(numBytesToAcquire >= 0)
     assert(numBytesToFree >= 0)
     assert(memoryUsed <= poolSize)
-    // Once we support off-heap caching, this will need to change:
-    if (numBytesToFree > 0 && memoryMode == MemoryMode.ON_HEAP) {
-      memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree)
+    if (numBytesToFree > 0) {
+      memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, 
memoryMode)
     }
     // NOTE: If the memory store evicts blocks, then those evictions will 
synchronously call
     // back into this StorageMemoryPool in order to free memory. Therefore, 
these variables
@@ -122,14 +126,8 @@ private[memory] class StorageMemoryPool(
     val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
     if (remainingSpaceToFree > 0) {
       // If reclaiming free memory did not adequately shrink the pool, begin 
evicting blocks:
-      val spaceFreedByEviction = {
-        // Once we support off-heap caching, this will need to change:
-        if (memoryMode == MemoryMode.ON_HEAP) {
-          memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
-        } else {
-          0
-        }
-      }
+      val spaceFreedByEviction =
+        memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, 
memoryMode)
       // When a block is released, BlockManager.dropFromMemory() calls 
releaseMemory(), so we do
       // not need to decrement _memoryUsed here. However, we do need to 
decrement the pool size.
       decrementPoolSize(spaceFreedByEviction)

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index d2b8ca9..46c64f6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.HashMap
 
 import org.apache.spark.{Accumulator, SparkEnv, TaskContext, TaskContextImpl}
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, 
Utils}
@@ -90,7 +90,8 @@ private[spark] abstract class Task[T](
       try {
         Utils.tryLogNonFatalError {
           // Release memory used by this thread for unrolling blocks
-          
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
+          
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
+          
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
           // Notify any tasks waiting for execution memory to be freed to wake 
up and try to
           // acquire memory again. This makes impossible the scenario where a 
task sleeps forever
           // because there are no other tasks left to notify it. Since this is 
safe to do but may

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala 
b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 27e5fa4..745ef12 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -25,7 +25,7 @@ import scala.reflect.ClassTag
 import org.apache.spark.SparkConf
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.storage._
-import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
+import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
 
 /**
  * Component which configures serialization and compression for various Spark 
components, including
@@ -128,17 +128,9 @@ private[spark] class SerializerManager(defaultSerializer: 
Serializer, conf: Spar
 
   /** Serializes into a chunked byte buffer. */
   def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): 
ChunkedByteBuffer = {
-    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 
1024 * 4)
-    dataSerializeStream(blockId, byteArrayChunkOutputStream, values)
-    new 
ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap))
-  }
-
-  /**
-   * Deserializes a ByteBuffer into an iterator of values and disposes of it 
when the end of
-   * the iterator is reached.
-   */
-  def dataDeserialize[T: ClassTag](blockId: BlockId, bytes: 
ChunkedByteBuffer): Iterator[T] = {
-    dataDeserializeStream[T](blockId, bytes.toInputStream(dispose = true))
+    val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, 
ByteBuffer.allocate)
+    dataSerializeStream(blockId, bbos, values)
+    bbos.toChunkedByteBuffer
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 3014caf..9608418 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.storage
 
 import java.io._
+import java.nio.ByteBuffer
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.concurrent.{Await, ExecutionContext, Future}
@@ -39,6 +40,7 @@ import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
 import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.storage.memory._
+import org.apache.spark.unsafe.Platform
 import org.apache.spark.util._
 import org.apache.spark.util.io.ChunkedByteBuffer
 
@@ -372,8 +374,12 @@ private[spark] class BlockManager(
           val onDisk = level.useDisk && diskStore.contains(blockId)
           val deserialized = if (inMem) level.deserialized else false
           val replication = if (inMem  || onDisk) level.replication else 1
-          val storageLevel =
-            StorageLevel(onDisk, inMem, deserialized, replication)
+          val storageLevel = StorageLevel(
+            useDisk = onDisk,
+            useMemory = inMem,
+            useOffHeap = level.useOffHeap,
+            deserialized = deserialized,
+            replication = replication)
           val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
           val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
           BlockStatus(storageLevel, memSize, diskSize)
@@ -407,8 +413,8 @@ private[spark] class BlockManager(
           val iter: Iterator[Any] = if (level.deserialized) {
             memoryStore.getValues(blockId).get
           } else {
-            serializerManager.dataDeserialize(
-              blockId, memoryStore.getBytes(blockId).get)(info.classTag)
+            serializerManager.dataDeserializeStream(
+              blockId, 
memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
           }
           val ci = CompletionIterator[Any, Iterator[Any]](iter, 
releaseLock(blockId))
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
@@ -416,11 +422,15 @@ private[spark] class BlockManager(
           val iterToReturn: Iterator[Any] = {
             val diskBytes = diskStore.getBytes(blockId)
             if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserialize(blockId, 
diskBytes)(info.classTag)
+              val diskValues = serializerManager.dataDeserializeStream(
+                blockId,
+                diskBytes.toInputStream(dispose = true))(info.classTag)
               maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
             } else {
-              val bytes = maybeCacheDiskBytesInMemory(info, blockId, level, 
diskBytes)
-              serializerManager.dataDeserialize(blockId, bytes)(info.classTag)
+              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, 
diskBytes)
+                .map {_.toInputStream(dispose = false)}
+                .getOrElse { diskBytes.toInputStream(dispose = true) }
+              serializerManager.dataDeserializeStream(blockId, 
stream)(info.classTag)
             }
           }
           val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, 
releaseLock(blockId))
@@ -481,7 +491,8 @@ private[spark] class BlockManager(
       if (level.useMemory && memoryStore.contains(blockId)) {
         memoryStore.getBytes(blockId).get
       } else if (level.useDisk && diskStore.contains(blockId)) {
-        maybeCacheDiskBytesInMemory(info, blockId, level, 
diskStore.getBytes(blockId))
+        val diskBytes = diskStore.getBytes(blockId)
+        maybeCacheDiskBytesInMemory(info, blockId, level, 
diskBytes).getOrElse(diskBytes)
       } else {
         releaseLock(blockId)
         throw new SparkException(s"Block $blockId was not found even though 
it's read-locked")
@@ -496,8 +507,9 @@ private[spark] class BlockManager(
    */
   private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
     getRemoteBytes(blockId).map { data =>
-      new BlockResult(
-        serializerManager.dataDeserialize(blockId, data), 
DataReadMethod.Network, data.size)
+      val values =
+        serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))
+      new BlockResult(values, DataReadMethod.Network, data.size)
     }
   }
 
@@ -745,7 +757,8 @@ private[spark] class BlockManager(
         // Put it in memory first, even if it also has useDisk set to true;
         // We will drop it to disk later if the memory store can't hold it.
         val putSucceeded = if (level.deserialized) {
-          val values = serializerManager.dataDeserialize(blockId, 
bytes)(classTag)
+          val values =
+            serializerManager.dataDeserializeStream(blockId, 
bytes.toInputStream())(classTag)
           memoryStore.putIteratorAsValues(blockId, values, classTag) match {
             case Right(_) => true
             case Left(iter) =>
@@ -755,7 +768,7 @@ private[spark] class BlockManager(
               false
           }
         } else {
-          memoryStore.putBytes(blockId, size, () => bytes)
+          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
         }
         if (!putSucceeded && level.useDisk) {
           logWarning(s"Persisting block $blockId to disk instead.")
@@ -893,7 +906,7 @@ private[spark] class BlockManager(
               }
           }
         } else { // !level.deserialized
-          memoryStore.putIteratorAsBytes(blockId, iterator(), classTag) match {
+          memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, 
level.memoryMode) match {
             case Right(s) =>
               size = s
             case Left(partiallySerializedValues) =>
@@ -951,14 +964,16 @@ private[spark] class BlockManager(
    * Attempts to cache spilled bytes read from disk into the MemoryStore in 
order to speed up
    * subsequent reads. This method requires the caller to hold a read lock on 
the block.
    *
-   * @return a copy of the bytes. The original bytes passed this method should 
no longer
-   *         be used after this method returns.
+   * @return a copy of the bytes from the memory store if the put succeeded, 
otherwise None.
+   *         If this returns bytes from the memory store then the original 
disk store bytes will
+   *         automatically be disposed and the caller should not continue to 
use them. Otherwise,
+   *         if this returns None then the original disk store bytes will be 
unaffected.
    */
   private def maybeCacheDiskBytesInMemory(
       blockInfo: BlockInfo,
       blockId: BlockId,
       level: StorageLevel,
-      diskBytes: ChunkedByteBuffer): ChunkedByteBuffer = {
+      diskBytes: ChunkedByteBuffer): Option[ChunkedByteBuffer] = {
     require(!level.deserialized)
     if (level.useMemory) {
       // Synchronize on blockInfo to guard against a race condition where two 
readers both try to
@@ -966,25 +981,29 @@ private[spark] class BlockManager(
       blockInfo.synchronized {
         if (memoryStore.contains(blockId)) {
           diskBytes.dispose()
-          memoryStore.getBytes(blockId).get
+          Some(memoryStore.getBytes(blockId).get)
         } else {
-          val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, () 
=> {
+          val allocator = level.memoryMode match {
+            case MemoryMode.ON_HEAP => ByteBuffer.allocate _
+            case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
+          }
+          val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, 
level.memoryMode, () => {
             // https://issues.apache.org/jira/browse/SPARK-6076
             // If the file size is bigger than the free memory, OOM will 
happen. So if we
             // cannot put it into MemoryStore, copyForMemory should not be 
created. That's why
             // this action is put into a `() => ChunkedByteBuffer` and created 
lazily.
-            diskBytes.copy()
+            diskBytes.copy(allocator)
           })
           if (putSucceeded) {
             diskBytes.dispose()
-            memoryStore.getBytes(blockId).get
+            Some(memoryStore.getBytes(blockId).get)
           } else {
-            diskBytes
+            None
           }
         }
       }
     } else {
-      diskBytes
+      None
     }
   }
 
@@ -1055,7 +1074,12 @@ private[spark] class BlockManager(
     val peersForReplication = new ArrayBuffer[BlockManagerId]
     val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
     val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
-    val tLevel = StorageLevel(level.useDisk, level.useMemory, 
level.deserialized, 1)
+    val tLevel = StorageLevel(
+      useDisk = level.useDisk,
+      useMemory = level.useMemory,
+      useOffHeap = level.useOffHeap,
+      deserialized = level.deserialized,
+      replication = 1)
     val startTime = System.currentTimeMillis
     val random = new Random(blockId.hashCode)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index d2a5c69..8fa1215 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -453,7 +453,7 @@ private[spark] class BlockManagerInfo(
     }
 
     if (storageLevel.isValid) {
-      /* isValid means it is either stored in-memory, on-disk or 
on-externalBlockStore.
+      /* isValid means it is either stored in-memory or on-disk.
        * The memSize here indicates the data size in or dropped from memory,
        * externalBlockStoreSize here indicates the data size in or dropped 
from externalBlockStore,
        * and the diskSize here indicates the data size in or dropped to disk.

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala 
b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 7d23295..216ec07 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -60,10 +60,7 @@ class StorageLevel private(
   assert(replication < 40, "Replication restricted to be less than 40 for 
calculating hash codes")
 
   if (useOffHeap) {
-    require(!useDisk, "Off-heap storage level does not support using disk")
-    require(!useMemory, "Off-heap storage level does not support using heap 
memory")
     require(!deserialized, "Off-heap storage level does not support 
deserialized storage")
-    require(replication == 1, "Off-heap storage level does not support 
multiple replication")
   }
 
   private[spark] def memoryMode: MemoryMode = {
@@ -86,7 +83,7 @@ class StorageLevel private(
       false
   }
 
-  def isValid: Boolean = (useMemory || useDisk || useOffHeap) && (replication 
> 0)
+  def isValid: Boolean = (useMemory || useDisk) && (replication > 0)
 
   def toInt: Int = {
     var ret = 0
@@ -123,7 +120,8 @@ class StorageLevel private(
   private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
 
   override def toString: String = {
-    s"StorageLevel($useDisk, $useMemory, $useOffHeap, $deserialized, 
$replication)"
+    s"StorageLevel(disk=$useDisk, memory=$useMemory, offheap=$useOffHeap, " +
+      s"deserialized=$deserialized, replication=$replication)"
   }
 
   override def hashCode(): Int = toInt * 41 + replication
@@ -131,8 +129,9 @@ class StorageLevel private(
   def description: String = {
     var result = ""
     result += (if (useDisk) "Disk " else "")
-    result += (if (useMemory) "Memory " else "")
-    result += (if (useOffHeap) "ExternalBlockStore " else "")
+    if (useMemory) {
+      result += (if (useOffHeap) "Memory (off heap) " else "Memory ")
+    }
     result += (if (deserialized) "Deserialized " else "Serialized ")
     result += s"${replication}x Replicated"
     result
@@ -156,9 +155,7 @@ object StorageLevel {
   val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
   val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
   val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
-
-  // Redirect to MEMORY_ONLY_SER for now.
-  val OFF_HEAP = MEMORY_ONLY_SER
+  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
 
   /**
    * :: DeveloperApi ::
@@ -183,7 +180,7 @@ object StorageLevel {
 
   /**
    * :: DeveloperApi ::
-   * Create a new StorageLevel object without setting useOffHeap.
+   * Create a new StorageLevel object.
    */
   @DeveloperApi
   def apply(
@@ -198,7 +195,7 @@ object StorageLevel {
 
   /**
    * :: DeveloperApi ::
-   * Create a new StorageLevel object.
+   * Create a new StorageLevel object without setting useOffHeap.
    */
   @DeveloperApi
   def apply(

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 3ca41f3..df38d11 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -32,20 +32,25 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.serializer.{SerializationStream, SerializerManager}
 import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
+import org.apache.spark.unsafe.Platform
 import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
 import org.apache.spark.util.collection.SizeTrackingVector
-import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
+import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
 
 private sealed trait MemoryEntry[T] {
   def size: Long
+  def memoryMode: MemoryMode
   def classTag: ClassTag[T]
 }
 private case class DeserializedMemoryEntry[T](
     value: Array[T],
     size: Long,
-    classTag: ClassTag[T]) extends MemoryEntry[T]
+    classTag: ClassTag[T]) extends MemoryEntry[T] {
+  val memoryMode: MemoryMode = MemoryMode.ON_HEAP
+}
 private case class SerializedMemoryEntry[T](
     buffer: ChunkedByteBuffer,
+    memoryMode: MemoryMode,
     classTag: ClassTag[T]) extends MemoryEntry[T] {
   def size: Long = buffer.size
 }
@@ -86,7 +91,10 @@ private[spark] class MemoryStore(
 
   // A mapping from taskAttemptId to amount of memory used for unrolling a 
block (in bytes)
   // All accesses of this map are assumed to have manually synchronized on 
`memoryManager`
-  private val unrollMemoryMap = mutable.HashMap[Long, Long]()
+  private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
+  // Note: off-heap unroll memory is only used in putIteratorAsBytes() because 
off-heap caching
+  // always stores serialized values.
+  private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
 
   // Initial memory to request before unrolling any block
   private val unrollMemoryThreshold: Long =
@@ -131,13 +139,14 @@ private[spark] class MemoryStore(
   def putBytes[T: ClassTag](
       blockId: BlockId,
       size: Long,
+      memoryMode: MemoryMode,
       _bytes: () => ChunkedByteBuffer): Boolean = {
     require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
-    if (memoryManager.acquireStorageMemory(blockId, size, MemoryMode.ON_HEAP)) 
{
+    if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
       // We acquired enough memory for the block, so go ahead and put it
       val bytes = _bytes()
       assert(bytes.size == size)
-      val entry = new SerializedMemoryEntry[T](bytes, implicitly[ClassTag[T]])
+      val entry = new SerializedMemoryEntry[T](bytes, memoryMode, 
implicitly[ClassTag[T]])
       entries.synchronized {
         entries.put(blockId, entry)
       }
@@ -190,7 +199,8 @@ private[spark] class MemoryStore(
     var vector = new SizeTrackingVector[T]()(classTag)
 
     // Request enough memory to begin unrolling
-    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold)
+    keepUnrolling =
+      reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, 
MemoryMode.ON_HEAP)
 
     if (!keepUnrolling) {
       logWarning(s"Failed to reserve initial memory threshold of " +
@@ -207,7 +217,8 @@ private[spark] class MemoryStore(
         val currentSize = vector.estimateSize()
         if (currentSize >= memoryThreshold) {
           val amountToRequest = (currentSize * memoryGrowthFactor - 
memoryThreshold).toLong
-          keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest)
+          keepUnrolling =
+            reserveUnrollMemoryForThisTask(blockId, amountToRequest, 
MemoryMode.ON_HEAP)
           if (keepUnrolling) {
             unrollMemoryUsedByThisBlock += amountToRequest
           }
@@ -228,7 +239,7 @@ private[spark] class MemoryStore(
       def transferUnrollToStorage(amount: Long): Unit = {
         // Synchronize so that transfer is atomic
         memoryManager.synchronized {
-          releaseUnrollMemoryForThisTask(amount)
+          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
           val success = memoryManager.acquireStorageMemory(blockId, amount, 
MemoryMode.ON_HEAP)
           assert(success, "transferring unroll memory to storage memory 
failed")
         }
@@ -247,7 +258,7 @@ private[spark] class MemoryStore(
           // If this task attempt already owns more unroll memory than is 
necessary to store the
           // block, then release the extra memory that will not be used.
           val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
-          releaseUnrollMemoryForThisTask(excessUnrollMemory)
+          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
excessUnrollMemory)
           transferUnrollToStorage(size)
           true
         }
@@ -295,10 +306,16 @@ private[spark] class MemoryStore(
   private[storage] def putIteratorAsBytes[T](
       blockId: BlockId,
       values: Iterator[T],
-      classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
+      classTag: ClassTag[T],
+      memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = {
 
     require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
 
+    val allocator = memoryMode match {
+      case MemoryMode.ON_HEAP => ByteBuffer.allocate _
+      case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
+    }
+
     // Whether there is still enough memory for us to continue unrolling this 
block
     var keepUnrolling = true
     // Initial per-task memory to request for unrolling blocks (bytes).
@@ -307,15 +324,15 @@ private[spark] class MemoryStore(
     var unrollMemoryUsedByThisBlock = 0L
     // Underlying buffer for unrolling the block
     val redirectableStream = new RedirectableOutputStream
-    val byteArrayChunkOutputStream = new 
ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
-    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
+    val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, 
allocator)
+    redirectableStream.setOutputStream(bbos)
     val serializationStream: SerializationStream = {
       val ser = serializerManager.getSerializer(classTag).newInstance()
       ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
     }
 
     // Request enough memory to begin unrolling
-    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold)
+    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold, memoryMode)
 
     if (!keepUnrolling) {
       logWarning(s"Failed to reserve initial memory threshold of " +
@@ -325,9 +342,9 @@ private[spark] class MemoryStore(
     }
 
     def reserveAdditionalMemoryIfNecessary(): Unit = {
-      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
-        val amountToRequest = byteArrayChunkOutputStream.size - 
unrollMemoryUsedByThisBlock
-        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest)
+      if (bbos.size > unrollMemoryUsedByThisBlock) {
+        val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
+        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest, memoryMode)
         if (keepUnrolling) {
           unrollMemoryUsedByThisBlock += amountToRequest
         }
@@ -349,12 +366,11 @@ private[spark] class MemoryStore(
     }
 
     if (keepUnrolling) {
-      val entry = SerializedMemoryEntry[T](
-        new 
ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)), 
classTag)
+      val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, 
memoryMode, classTag)
       // Synchronize so that transfer is atomic
       memoryManager.synchronized {
-        releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock)
-        val success = memoryManager.acquireStorageMemory(blockId, entry.size, 
MemoryMode.ON_HEAP)
+        releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
+        val success = memoryManager.acquireStorageMemory(blockId, entry.size, 
memoryMode)
         assert(success, "transferring unroll memory to storage memory failed")
       }
       entries.synchronized {
@@ -365,7 +381,7 @@ private[spark] class MemoryStore(
       Right(entry.size)
     } else {
       // We ran out of space while unrolling the values for this block
-      logUnrollFailureMessage(blockId, byteArrayChunkOutputStream.size)
+      logUnrollFailureMessage(blockId, bbos.size)
       Left(
         new PartiallySerializedBlock(
           this,
@@ -374,7 +390,8 @@ private[spark] class MemoryStore(
           serializationStream,
           redirectableStream,
           unrollMemoryUsedByThisBlock,
-          new 
ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)),
+          memoryMode,
+          bbos.toChunkedByteBuffer,
           values,
           classTag))
     }
@@ -386,7 +403,7 @@ private[spark] class MemoryStore(
       case null => None
       case e: DeserializedMemoryEntry[_] =>
         throw new IllegalArgumentException("should only call getBytes on 
serialized blocks")
-      case SerializedMemoryEntry(bytes, _) => Some(bytes)
+      case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
     }
   }
 
@@ -407,8 +424,12 @@ private[spark] class MemoryStore(
       entries.remove(blockId)
     }
     if (entry != null) {
-      memoryManager.releaseStorageMemory(entry.size, MemoryMode.ON_HEAP)
-      logInfo(s"Block $blockId of size ${entry.size} dropped " +
+      entry match {
+        case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+        case _ =>
+      }
+      memoryManager.releaseStorageMemory(entry.size, entry.memoryMode)
+      logDebug(s"Block $blockId of size ${entry.size} dropped " +
         s"from memory (free ${maxMemory - blocksMemoryUsed})")
       true
     } else {
@@ -420,7 +441,8 @@ private[spark] class MemoryStore(
     entries.synchronized {
       entries.clear()
     }
-    unrollMemoryMap.clear()
+    onHeapUnrollMemoryMap.clear()
+    offHeapUnrollMemoryMap.clear()
     memoryManager.releaseAllStorageMemory()
     logInfo("MemoryStore cleared")
   }
@@ -440,16 +462,20 @@ private[spark] class MemoryStore(
     *
     * @param blockId the ID of the block we are freeing space for, if any
     * @param space the size of this block
+    * @param memoryMode the type of memory to free (on- or off-heap)
     * @return the amount of memory (in bytes) freed by eviction
     */
-  private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId], space: 
Long): Long = {
+  private[spark] def evictBlocksToFreeSpace(
+      blockId: Option[BlockId],
+      space: Long,
+      memoryMode: MemoryMode): Long = {
     assert(space > 0)
     memoryManager.synchronized {
       var freedMemory = 0L
       val rddToAdd = blockId.flatMap(getRddId)
       val selectedBlocks = new ArrayBuffer[BlockId]
-      def blockIsEvictable(blockId: BlockId): Boolean = {
-        rddToAdd.isEmpty || rddToAdd != getRddId(blockId)
+      def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = 
{
+        entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != 
getRddId(blockId))
       }
       // This is synchronized to ensure that the set of entries is not changed
       // (because of getValue or getBytes) while traversing the iterator, as 
that
@@ -459,7 +485,8 @@ private[spark] class MemoryStore(
         while (freedMemory < space && iterator.hasNext) {
           val pair = iterator.next()
           val blockId = pair.getKey
-          if (blockIsEvictable(blockId)) {
+          val entry = pair.getValue
+          if (blockIsEvictable(blockId, entry)) {
             // We don't want to evict blocks which are currently being read, 
so we need to obtain
             // an exclusive write lock on blocks which are candidates for 
eviction. We perform a
             // non-blocking "tryLock" here in order to ignore blocks which are 
locked for reading:
@@ -474,7 +501,7 @@ private[spark] class MemoryStore(
       def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
         val data = entry match {
           case DeserializedMemoryEntry(values, _, _) => Left(values)
-          case SerializedMemoryEntry(buffer, _) => Right(buffer)
+          case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
         }
         val newEffectiveStorageLevel =
           blockEvictionHandler.dropFromMemory(blockId, () => 
data)(entry.classTag)
@@ -530,11 +557,18 @@ private[spark] class MemoryStore(
    *
    * @return whether the request is granted.
    */
-  def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean 
= {
+  def reserveUnrollMemoryForThisTask(
+      blockId: BlockId,
+      memory: Long,
+      memoryMode: MemoryMode): Boolean = {
     memoryManager.synchronized {
-      val success = memoryManager.acquireUnrollMemory(blockId, memory, 
MemoryMode.ON_HEAP)
+      val success = memoryManager.acquireUnrollMemory(blockId, memory, 
memoryMode)
       if (success) {
         val taskAttemptId = currentTaskAttemptId()
+        val unrollMemoryMap = memoryMode match {
+          case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
+          case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
+        }
         unrollMemoryMap(taskAttemptId) = 
unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
       }
       success
@@ -545,9 +579,13 @@ private[spark] class MemoryStore(
    * Release memory used by this task for unrolling blocks.
    * If the amount is not specified, remove the current task's allocation 
altogether.
    */
-  def releaseUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
+  def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = 
Long.MaxValue): Unit = {
     val taskAttemptId = currentTaskAttemptId()
     memoryManager.synchronized {
+      val unrollMemoryMap = memoryMode match {
+        case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
+        case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
+      }
       if (unrollMemoryMap.contains(taskAttemptId)) {
         val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
         if (memoryToRelease > 0) {
@@ -555,7 +593,7 @@ private[spark] class MemoryStore(
           if (unrollMemoryMap(taskAttemptId) == 0) {
             unrollMemoryMap.remove(taskAttemptId)
           }
-          memoryManager.releaseUnrollMemory(memoryToRelease, 
MemoryMode.ON_HEAP)
+          memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode)
         }
       }
     }
@@ -565,20 +603,23 @@ private[spark] class MemoryStore(
    * Return the amount of memory currently occupied for unrolling blocks 
across all tasks.
    */
   def currentUnrollMemory: Long = memoryManager.synchronized {
-    unrollMemoryMap.values.sum
+    onHeapUnrollMemoryMap.values.sum + offHeapUnrollMemoryMap.values.sum
   }
 
   /**
    * Return the amount of memory currently occupied for unrolling blocks by 
this task.
    */
   def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized {
-    unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
+    onHeapUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) +
+      offHeapUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
   }
 
   /**
    * Return the number of tasks currently unrolling blocks.
    */
-  private def numTasksUnrolling: Int = memoryManager.synchronized { 
unrollMemoryMap.keys.size }
+  private def numTasksUnrolling: Int = memoryManager.synchronized {
+    (onHeapUnrollMemoryMap.keys ++ offHeapUnrollMemoryMap.keys).toSet.size
+  }
 
   /**
    * Log information about current memory usage.
@@ -627,7 +668,7 @@ private[storage] class PartiallyUnrolledIterator[T](
   private[this] var iter: Iterator[T] = {
     val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, {
       unrolledIteratorIsConsumed = true
-      memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+      memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
unrollMemory)
     })
     completionIterator ++ rest
   }
@@ -640,7 +681,7 @@ private[storage] class PartiallyUnrolledIterator[T](
    */
   def close(): Unit = {
     if (!unrolledIteratorIsConsumed) {
-      memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+      memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 
unrollMemory)
       unrolledIteratorIsConsumed = true
     }
     iter = null
@@ -669,6 +710,7 @@ private class RedirectableOutputStream extends OutputStream 
{
  * @param serializationStream a serialization stream which writes to 
[[redirectableOutputStream]].
  * @param redirectableOutputStream an OutputStream which can be redirected to 
a different sink.
  * @param unrollMemory the amount of unroll memory used by the values in 
`unrolled`.
+ * @param memoryMode whether the unroll memory is on- or off-heap
  * @param unrolled a byte buffer containing the partially-serialized values.
  * @param rest         the rest of the original iterator passed to
  *                     [[MemoryStore.putIteratorAsValues()]].
@@ -681,18 +723,36 @@ private[storage] class PartiallySerializedBlock[T](
     serializationStream: SerializationStream,
     redirectableOutputStream: RedirectableOutputStream,
     unrollMemory: Long,
+    memoryMode: MemoryMode,
     unrolled: ChunkedByteBuffer,
     rest: Iterator[T],
     classTag: ClassTag[T]) {
 
+  // If the task does not fully consume `valuesIterator` or otherwise fails to 
consume or dispose of
+  // this PartiallySerializedBlock then we risk leaking of direct buffers, so 
we use a task
+  // completion listener here in order to ensure that `unrolled.dispose()` is 
called at least once.
+  // The dispose() method is idempotent, so it's safe to call it 
unconditionally.
+  Option(TaskContext.get()).foreach { taskContext =>
+    taskContext.addTaskCompletionListener { _ =>
+      // When a task completes, its unroll memory will automatically be freed. 
Thus we do not call
+      // releaseUnrollMemoryForThisTask() here because we want to avoid 
double-freeing.
+      unrolled.dispose()
+    }
+  }
+
   /**
    * Called to dispose of this block and free its memory.
    */
   def discard(): Unit = {
     try {
+      // We want to close the output stream in order to free any resources 
associated with the
+      // serializer itself (such as Kryo's internal buffers). close() might 
cause data to be
+      // written, so redirect the output stream to discard that data.
+      redirectableOutputStream.setOutputStream(ByteStreams.nullOutputStream())
       serializationStream.close()
     } finally {
-      memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+      unrolled.dispose()
+      memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
     }
   }
 
@@ -701,12 +761,14 @@ private[storage] class PartiallySerializedBlock[T](
    * and then serializing the values from the original input iterator.
    */
   def finishWritingToStream(os: OutputStream): Unit = {
-    ByteStreams.copy(unrolled.toInputStream(), os)
+    // `unrolled`'s underlying buffers will be freed once this input stream is 
fully read:
+    ByteStreams.copy(unrolled.toInputStream(dispose = true), os)
+    memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
     redirectableOutputStream.setOutputStream(os)
     while (rest.hasNext) {
       serializationStream.writeObject(rest.next())(classTag)
     }
-    discard()
+    serializationStream.close()
   }
 
   /**
@@ -717,10 +779,13 @@ private[storage] class PartiallySerializedBlock[T](
    * `close()` on it to free its resources.
    */
   def valuesIterator: PartiallyUnrolledIterator[T] = {
+    // `unrolled`'s underlying buffers will be freed once this input stream is 
fully read:
+    val unrolledIter = serializerManager.dataDeserializeStream(
+      blockId, unrolled.toInputStream(dispose = true))(classTag)
     new PartiallyUnrolledIterator(
       memoryStore,
       unrollMemory,
-      unrolled = serializerManager.dataDeserialize(blockId, 
unrolled)(classTag),
+      unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, discard()),
       rest = rest)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala 
b/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala
deleted file mode 100644
index 16fe3be..0000000
--- 
a/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util.io
-
-import java.io.OutputStream
-
-import scala.collection.mutable.ArrayBuffer
-
-
-/**
- * An OutputStream that writes to fixed-size chunks of byte arrays.
- *
- * @param chunkSize size of each chunk, in bytes.
- */
-private[spark]
-class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
-
-  private[this] val chunks = new ArrayBuffer[Array[Byte]]
-
-  /** Index of the last chunk. Starting with -1 when the chunks array is 
empty. */
-  private[this] var lastChunkIndex = -1
-
-  /**
-   * Next position to write in the last chunk.
-   *
-   * If this equals chunkSize, it means for next write we need to allocate a 
new chunk.
-   * This can also never be 0.
-   */
-  private[this] var position = chunkSize
-  private[this] var _size = 0
-
-  def size: Long = _size
-
-  override def write(b: Int): Unit = {
-    allocateNewChunkIfNeeded()
-    chunks(lastChunkIndex)(position) = b.toByte
-    position += 1
-    _size += 1
-  }
-
-  override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
-    var written = 0
-    while (written < len) {
-      allocateNewChunkIfNeeded()
-      val thisBatch = math.min(chunkSize - position, len - written)
-      System.arraycopy(bytes, written + off, chunks(lastChunkIndex), position, 
thisBatch)
-      written += thisBatch
-      position += thisBatch
-    }
-    _size += len
-  }
-
-  @inline
-  private def allocateNewChunkIfNeeded(): Unit = {
-    if (position == chunkSize) {
-      chunks += new Array[Byte](chunkSize)
-      lastChunkIndex += 1
-      position = 0
-    }
-  }
-
-  def toArrays: Array[Array[Byte]] = {
-    if (lastChunkIndex == -1) {
-      new Array[Array[Byte]](0)
-    } else {
-      // Copy the first n-1 chunks to the output, and then create an array 
that fits the last chunk.
-      // An alternative would have been returning an array of ByteBuffers, 
with the last buffer
-      // bounded to only the last chunk's position. However, given our use 
case in Spark (to put
-      // the chunks in block manager), only limiting the view bound of the 
buffer would still
-      // require the block manager to store the whole chunk.
-      val ret = new Array[Array[Byte]](chunks.size)
-      for (i <- 0 until chunks.size - 1) {
-        ret(i) = chunks(i)
-      }
-      if (position == chunkSize) {
-        ret(lastChunkIndex) = chunks(lastChunkIndex)
-      } else {
-        ret(lastChunkIndex) = new Array[Byte](position)
-        System.arraycopy(chunks(lastChunkIndex), 0, ret(lastChunkIndex), 0, 
position)
-      }
-      ret
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index c643c4b..fb4706e 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -41,6 +41,8 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
   require(chunks.forall(_.limit() > 0), "chunks must be non-empty")
   require(chunks.forall(_.position() == 0), "chunks' positions must be 0")
 
+  private[this] var disposed: Boolean = false
+
   /**
    * This size of this buffer, in bytes.
    */
@@ -117,11 +119,12 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
   /**
    * Make a copy of this ChunkedByteBuffer, copying all of the backing data 
into new buffers.
    * The new buffer will share no resources with the original buffer.
+   *
+   * @param allocator a method for allocating byte buffers
    */
-  def copy(): ChunkedByteBuffer = {
+  def copy(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
     val copiedChunks = getChunks().map { chunk =>
-      // TODO: accept an allocator in this copy method to integrate with mem. 
accounting systems
-      val newChunk = ByteBuffer.allocate(chunk.limit())
+      val newChunk = allocator(chunk.limit())
       newChunk.put(chunk)
       newChunk.flip()
       newChunk
@@ -136,7 +139,10 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
    * unfortunately no standard API to do this.
    */
   def dispose(): Unit = {
-    chunks.foreach(StorageUtils.dispose)
+    if (!disposed) {
+      chunks.foreach(StorageUtils.dispose)
+      disposed = true
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala
 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala
new file mode 100644
index 0000000..67b50d1
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.io
+
+import java.io.OutputStream
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.storage.StorageUtils
+
+/**
+ * An OutputStream that writes to fixed-size chunks of byte arrays.
+ *
+ * @param chunkSize size of each chunk, in bytes.
+ */
+private[spark] class ChunkedByteBufferOutputStream(
+    chunkSize: Int,
+    allocator: Int => ByteBuffer)
+  extends OutputStream {
+
+  private[this] var toChunkedByteBufferWasCalled = false
+
+  private val chunks = new ArrayBuffer[ByteBuffer]
+
+  /** Index of the last chunk. Starting with -1 when the chunks array is 
empty. */
+  private[this] var lastChunkIndex = -1
+
+  /**
+   * Next position to write in the last chunk.
+   *
+   * If this equals chunkSize, it means for next write we need to allocate a 
new chunk.
+   * This can also never be 0.
+   */
+  private[this] var position = chunkSize
+  private[this] var _size = 0
+
+  def size: Long = _size
+
+  override def write(b: Int): Unit = {
+    allocateNewChunkIfNeeded()
+    chunks(lastChunkIndex).put(b.toByte)
+    position += 1
+    _size += 1
+  }
+
+  override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
+    var written = 0
+    while (written < len) {
+      allocateNewChunkIfNeeded()
+      val thisBatch = math.min(chunkSize - position, len - written)
+      chunks(lastChunkIndex).put(bytes, written + off, thisBatch)
+      written += thisBatch
+      position += thisBatch
+    }
+    _size += len
+  }
+
+  @inline
+  private def allocateNewChunkIfNeeded(): Unit = {
+    require(!toChunkedByteBufferWasCalled, "cannot write after 
toChunkedByteBuffer() is called")
+    if (position == chunkSize) {
+      chunks += allocator(chunkSize)
+      lastChunkIndex += 1
+      position = 0
+    }
+  }
+
+  def toChunkedByteBuffer: ChunkedByteBuffer = {
+    require(!toChunkedByteBufferWasCalled, "toChunkedByteBuffer() can only be 
called once")
+    toChunkedByteBufferWasCalled = true
+    if (lastChunkIndex == -1) {
+      new ChunkedByteBuffer(Array.empty[ByteBuffer])
+    } else {
+      // Copy the first n-1 chunks to the output, and then create an array 
that fits the last chunk.
+      // An alternative would have been returning an array of ByteBuffers, 
with the last buffer
+      // bounded to only the last chunk's position. However, given our use 
case in Spark (to put
+      // the chunks in block manager), only limiting the view bound of the 
buffer would still
+      // require the block manager to store the whole chunk.
+      val ret = new Array[ByteBuffer](chunks.size)
+      for (i <- 0 until chunks.size - 1) {
+        ret(i) = chunks(i)
+        ret(i).flip()
+      }
+      if (position == chunkSize) {
+        ret(lastChunkIndex) = chunks(lastChunkIndex)
+        ret(lastChunkIndex).flip()
+      } else {
+        ret(lastChunkIndex) = allocator(position)
+        chunks(lastChunkIndex).flip()
+        ret(lastChunkIndex).put(chunks(lastChunkIndex))
+        ret(lastChunkIndex).flip()
+        StorageUtils.dispose(chunks(lastChunkIndex))
+      }
+      new ChunkedByteBuffer(ret)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 3dded4d..67d722c 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -198,8 +198,8 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
     blockManager.master.getLocations(blockId).foreach { cmId =>
       val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, 
cmId.executorId,
         blockId.toString)
-      val deserialized = serializerManager.dataDeserialize[Int](blockId,
-        new ChunkedByteBuffer(bytes.nioByteBuffer())).toList
+      val deserialized = serializerManager.dataDeserializeStream[Int](blockId,
+        new ChunkedByteBuffer(bytes.nioByteBuffer()).toInputStream()).toList
       assert(deserialized === (1 to 100).toList)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala 
b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
index aab70e7..f205d4f 100644
--- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
@@ -52,7 +52,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
 
   test("copy() does not affect original buffer's position") {
     val chunkedByteBuffer = new 
ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
-    chunkedByteBuffer.copy()
+    chunkedByteBuffer.copy(ByteBuffer.allocate)
     assert(chunkedByteBuffer.getChunks().head.position() === 0)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index aaca653..3d1a0e9 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -71,7 +71,8 @@ private[memory] trait MemoryManagerSuite extends 
SparkFunSuite with BeforeAndAft
    */
   protected def makeMemoryStore(mm: MemoryManager): MemoryStore = {
     val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
-    when(ms.evictBlocksToFreeSpace(any(), 
anyLong())).thenAnswer(evictBlocksToFreeSpaceAnswer(mm))
+    when(ms.evictBlocksToFreeSpace(any(), anyLong(), any()))
+      .thenAnswer(evictBlocksToFreeSpaceAnswer(mm))
     mm.setMemoryStore(ms)
     ms
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 98e8450..2ec5319 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, Matchers}
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark._
-import org.apache.spark.memory.StaticMemoryManager
+import org.apache.spark.memory.UnifiedMemoryManager
 import org.apache.spark.network.BlockTransferService
 import org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.rpc.RpcEnv
@@ -60,8 +60,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite 
with Matchers with Befo
   private def makeBlockManager(
       maxMem: Long,
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+    conf.set("spark.testing.memory", maxMem.toString)
+    conf.set("spark.memory.offHeap.size", maxMem.toString)
     val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 
1)
-    val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, 
numCores = 1)
+    val memManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(serializer, conf)
     val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,
       memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
@@ -76,6 +78,9 @@ class BlockManagerReplicationSuite extends SparkFunSuite with 
Matchers with Befo
 
     conf.set("spark.authenticate", "false")
     conf.set("spark.driver.port", rpcEnv.address.port.toString)
+    conf.set("spark.testing", "true")
+    conf.set("spark.memory.fraction", "1")
+    conf.set("spark.memory.storageFraction", "1")
     conf.set("spark.storage.unrollFraction", "0.4")
     conf.set("spark.storage.unrollMemoryThreshold", "512")
 
@@ -172,6 +177,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite 
with Matchers with Befo
     testReplication(5, storageLevels)
   }
 
+  test("block replication - off-heap") {
+    testReplication(2, Seq(OFF_HEAP, StorageLevel(true, true, true, false, 2)))
+  }
+
   test("block replication - 2x replication without peers") {
     intercept[org.scalatest.exceptions.TestFailedException] {
       testReplication(1,
@@ -262,7 +271,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite 
with Matchers with Befo
     val failableTransfer = mock(classOf[BlockTransferService]) // this wont 
actually work
     when(failableTransfer.hostName).thenReturn("some-hostname")
     when(failableTransfer.port).thenReturn(1000)
-    val memManager = new StaticMemoryManager(conf, Long.MaxValue, 10000, 
numCores = 1)
+    conf.set("spark.testing.memory", "10000")
+    val memManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(serializer, conf)
     val failableStore = new BlockManager("failable-store", rpcEnv, master, 
serializerManager, conf,
       memManager, mapOutputTracker, shuffleManager, failableTransfer, 
securityMgr, 0)
@@ -392,10 +402,14 @@ class BlockManagerReplicationSuite extends SparkFunSuite 
with Matchers with Befo
         // If the block is supposed to be in memory, then drop the copy of the 
block in
         // this store test whether master is updated with zero memory usage 
this store
         if (storageLevel.useMemory) {
+          val sl = if (storageLevel.useOffHeap) {
+            StorageLevel(false, true, true, false, 1)
+          } else {
+            MEMORY_ONLY_SER
+          }
           // Force the block to be dropped by adding a number of dummy blocks
           (1 to 10).foreach {
-            i =>
-              testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), 
MEMORY_ONLY_SER)
+            i => testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), 
sl)
           }
           (1 to 10).foreach {
             i => testStore.removeBlock(s"dummy-block-$i")

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 9f3a775..32c00ac 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -34,7 +34,7 @@ import org.scalatest.concurrent.Timeouts._
 
 import org.apache.spark._
 import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.memory.{MemoryMode, StaticMemoryManager}
+import org.apache.spark.memory.UnifiedMemoryManager
 import org.apache.spark.network.{BlockDataManager, BlockTransferService}
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.netty.NettyBlockTransferService
@@ -74,10 +74,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
       name: String = SparkContext.DRIVER_IDENTIFIER,
       master: BlockManagerMaster = this.master,
       transferService: Option[BlockTransferService] = Option.empty): 
BlockManager = {
+    conf.set("spark.testing.memory", maxMem.toString)
+    conf.set("spark.memory.offHeap.size", maxMem.toString)
     val serializer = new KryoSerializer(conf)
     val transfer = transferService
       .getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 
1))
-    val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, 
numCores = 1)
+    val memManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(serializer, conf)
     val blockManager = new BlockManager(name, rpcEnv, master, 
serializerManager, conf,
       memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
@@ -92,6 +94,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     System.setProperty("os.arch", "amd64")
     conf = new SparkConf(false)
       .set("spark.app.id", "test")
+      .set("spark.testing", "true")
+      .set("spark.memory.fraction", "1")
+      .set("spark.memory.storageFraction", "1")
       .set("spark.kryoserializer.buffer", "1m")
       .set("spark.test.useCompressedOops", "true")
       .set("spark.storage.unrollFraction", "0.4")
@@ -518,6 +523,14 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY_SER)
   }
 
+  test("in-memory LRU storage with off-heap") {
+    testInMemoryLRUStorage(StorageLevel(
+      useDisk = false,
+      useMemory = true,
+      useOffHeap = true,
+      deserialized = false, replication = 1))
+  }
+
   private def testInMemoryLRUStorage(storageLevel: StorageLevel): Unit = {
     store = makeBlockManager(12000)
     val a1 = new Array[Byte](4000)
@@ -608,6 +621,14 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = 
true)
   }
 
+  test("disk and off-heap memory storage") {
+    testDiskAndMemoryStorage(StorageLevel.OFF_HEAP, getAsBytes = false)
+  }
+
+  test("disk and off-heap memory storage with getLocalBytes") {
+    testDiskAndMemoryStorage(StorageLevel.OFF_HEAP, getAsBytes = true)
+  }
+
   def testDiskAndMemoryStorage(
       storageLevel: StorageLevel,
       getAsBytes: Boolean): Unit = {
@@ -817,12 +838,9 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 
   test("block store put failure") {
     // Use Java serializer so we can create an unserializable error.
+    conf.set("spark.testing.memory", "1200")
     val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 
1)
-    val memoryManager = new StaticMemoryManager(
-      conf,
-      maxOnHeapExecutionMemory = Long.MaxValue,
-      maxOnHeapStorageMemory = 1200,
-      numCores = 1)
+    val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(new JavaSerializer(conf), 
conf)
     store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
       serializerManager, conf, memoryManager, mapOutputTracker,

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index 43e832d..145d432 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -27,7 +27,7 @@ import scala.reflect.ClassTag
 import org.scalatest._
 
 import org.apache.spark._
-import org.apache.spark.memory.StaticMemoryManager
+import org.apache.spark.memory.{MemoryMode, StaticMemoryManager}
 import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
 import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, 
PartiallySerializedBlock, PartiallyUnrolledIterator}
 import org.apache.spark.util._
@@ -86,7 +86,7 @@ class MemoryStoreSuite
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     def reserveUnrollMemoryForThisTask(memory: Long): Boolean = {
-      memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory)
+      memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory, 
MemoryMode.ON_HEAP)
     }
 
     // Reserve
@@ -99,9 +99,9 @@ class MemoryStoreSuite
     assert(!reserveUnrollMemoryForThisTask(1000000))
     assert(memoryStore.currentUnrollMemoryForThisTask === 800) // not granted
     // Release
-    memoryStore.releaseUnrollMemoryForThisTask(100)
+    memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 100)
     assert(memoryStore.currentUnrollMemoryForThisTask === 700)
-    memoryStore.releaseUnrollMemoryForThisTask(100)
+    memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 100)
     assert(memoryStore.currentUnrollMemoryForThisTask === 600)
     // Reserve again
     assert(reserveUnrollMemoryForThisTask(4400))
@@ -109,9 +109,9 @@ class MemoryStoreSuite
     assert(!reserveUnrollMemoryForThisTask(20000))
     assert(memoryStore.currentUnrollMemoryForThisTask === 5000) // not granted
     // Release again
-    memoryStore.releaseUnrollMemoryForThisTask(1000)
+    memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 1000)
     assert(memoryStore.currentUnrollMemoryForThisTask === 4000)
-    memoryStore.releaseUnrollMemoryForThisTask() // release all
+    memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) // release 
all
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
   }
 
@@ -254,7 +254,7 @@ class MemoryStoreSuite
       assert(blockInfoManager.lockNewBlockForWriting(
         blockId,
         new BlockInfo(StorageLevel.MEMORY_ONLY_SER, classTag, tellMaster = 
false)))
-      val res = memoryStore.putIteratorAsBytes(blockId, iter, classTag)
+      val res = memoryStore.putIteratorAsBytes(blockId, iter, classTag, 
MemoryMode.ON_HEAP)
       blockInfoManager.unlock(blockId)
       res
     }
@@ -312,7 +312,7 @@ class MemoryStoreSuite
     assert(blockInfoManager.lockNewBlockForWriting(
       "b1",
       new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = 
false)))
-    val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any)
+    val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any, 
MemoryMode.ON_HEAP)
     blockInfoManager.unlock("b1")
     assert(res.isLeft)
     assert(memoryStore.currentUnrollMemoryForThisTask > 0)
@@ -333,7 +333,7 @@ class MemoryStoreSuite
     assert(blockInfoManager.lockNewBlockForWriting(
       "b1",
       new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = 
false)))
-    val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any)
+    val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any, 
MemoryMode.ON_HEAP)
     blockInfoManager.unlock("b1")
     assert(res.isLeft)
     assert(memoryStore.currentUnrollMemoryForThisTask > 0)
@@ -395,7 +395,7 @@ class MemoryStoreSuite
     val blockId = BlockId("rdd_3_10")
     blockInfoManager.lockNewBlockForWriting(
       blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, 
tellMaster = false))
-    memoryStore.putBytes(blockId, 13000, () => {
+    memoryStore.putBytes(blockId, 13000, MemoryMode.ON_HEAP, () => {
       fail("A big ByteBuffer that cannot be put into MemoryStore should not be 
created")
     })
   }
@@ -404,7 +404,7 @@ class MemoryStoreSuite
     val (memoryStore, _) = makeMemoryStore(12000)
     val blockId = BlockId("rdd_3_10")
     var bytes: ChunkedByteBuffer = null
-    memoryStore.putBytes(blockId, 10000, () => {
+    memoryStore.putBytes(blockId, 10000, MemoryMode.ON_HEAP, () => {
       bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000))
       bytes
     })

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala
deleted file mode 100644
index 361ec95..0000000
--- 
a/core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util.io
-
-import scala.util.Random
-
-import org.apache.spark.SparkFunSuite
-
-
-class ByteArrayChunkOutputStreamSuite extends SparkFunSuite {
-
-  test("empty output") {
-    val o = new ByteArrayChunkOutputStream(1024)
-    assert(o.toArrays.length === 0)
-  }
-
-  test("write a single byte") {
-    val o = new ByteArrayChunkOutputStream(1024)
-    o.write(10)
-    assert(o.toArrays.length === 1)
-    assert(o.toArrays.head.toSeq === Seq(10.toByte))
-  }
-
-  test("write a single near boundary") {
-    val o = new ByteArrayChunkOutputStream(10)
-    o.write(new Array[Byte](9))
-    o.write(99)
-    assert(o.toArrays.length === 1)
-    assert(o.toArrays.head(9) === 99.toByte)
-  }
-
-  test("write a single at boundary") {
-    val o = new ByteArrayChunkOutputStream(10)
-    o.write(new Array[Byte](10))
-    o.write(99)
-    assert(o.toArrays.length === 2)
-    assert(o.toArrays(1).length === 1)
-    assert(o.toArrays(1)(0) === 99.toByte)
-  }
-
-  test("single chunk output") {
-    val ref = new Array[Byte](8)
-    Random.nextBytes(ref)
-    val o = new ByteArrayChunkOutputStream(10)
-    o.write(ref)
-    val arrays = o.toArrays
-    assert(arrays.length === 1)
-    assert(arrays.head.length === ref.length)
-    assert(arrays.head.toSeq === ref.toSeq)
-  }
-
-  test("single chunk output at boundary size") {
-    val ref = new Array[Byte](10)
-    Random.nextBytes(ref)
-    val o = new ByteArrayChunkOutputStream(10)
-    o.write(ref)
-    val arrays = o.toArrays
-    assert(arrays.length === 1)
-    assert(arrays.head.length === ref.length)
-    assert(arrays.head.toSeq === ref.toSeq)
-  }
-
-  test("multiple chunk output") {
-    val ref = new Array[Byte](26)
-    Random.nextBytes(ref)
-    val o = new ByteArrayChunkOutputStream(10)
-    o.write(ref)
-    val arrays = o.toArrays
-    assert(arrays.length === 3)
-    assert(arrays(0).length === 10)
-    assert(arrays(1).length === 10)
-    assert(arrays(2).length === 6)
-
-    assert(arrays(0).toSeq === ref.slice(0, 10))
-    assert(arrays(1).toSeq === ref.slice(10, 20))
-    assert(arrays(2).toSeq === ref.slice(20, 26))
-  }
-
-  test("multiple chunk output at boundary size") {
-    val ref = new Array[Byte](30)
-    Random.nextBytes(ref)
-    val o = new ByteArrayChunkOutputStream(10)
-    o.write(ref)
-    val arrays = o.toArrays
-    assert(arrays.length === 3)
-    assert(arrays(0).length === 10)
-    assert(arrays(1).length === 10)
-    assert(arrays(2).length === 10)
-
-    assert(arrays(0).toSeq === ref.slice(0, 10))
-    assert(arrays(1).toSeq === ref.slice(10, 20))
-    assert(arrays(2).toSeq === ref.slice(20, 30))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/core/src/test/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStreamSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStreamSuite.scala
new file mode 100644
index 0000000..2266220
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStreamSuite.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.io
+
+import java.nio.ByteBuffer
+
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+
+class ChunkedByteBufferOutputStreamSuite extends SparkFunSuite {
+
+  test("empty output") {
+    val o = new ChunkedByteBufferOutputStream(1024, ByteBuffer.allocate)
+    assert(o.toChunkedByteBuffer.size === 0)
+  }
+
+  test("write a single byte") {
+    val o = new ChunkedByteBufferOutputStream(1024, ByteBuffer.allocate)
+    o.write(10)
+    val chunkedByteBuffer = o.toChunkedByteBuffer
+    assert(chunkedByteBuffer.getChunks().length === 1)
+    assert(chunkedByteBuffer.getChunks().head.array().toSeq === Seq(10.toByte))
+  }
+
+  test("write a single near boundary") {
+    val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate)
+    o.write(new Array[Byte](9))
+    o.write(99)
+    val chunkedByteBuffer = o.toChunkedByteBuffer
+    assert(chunkedByteBuffer.getChunks().length === 1)
+    assert(chunkedByteBuffer.getChunks().head.array()(9) === 99.toByte)
+  }
+
+  test("write a single at boundary") {
+    val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate)
+    o.write(new Array[Byte](10))
+    o.write(99)
+    val arrays = o.toChunkedByteBuffer.getChunks().map(_.array())
+    assert(arrays.length === 2)
+    assert(arrays(1).length === 1)
+    assert(arrays(1)(0) === 99.toByte)
+  }
+
+  test("single chunk output") {
+    val ref = new Array[Byte](8)
+    Random.nextBytes(ref)
+    val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate)
+    o.write(ref)
+    val arrays = o.toChunkedByteBuffer.getChunks().map(_.array())
+    assert(arrays.length === 1)
+    assert(arrays.head.length === ref.length)
+    assert(arrays.head.toSeq === ref.toSeq)
+  }
+
+  test("single chunk output at boundary size") {
+    val ref = new Array[Byte](10)
+    Random.nextBytes(ref)
+    val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate)
+    o.write(ref)
+    val arrays = o.toChunkedByteBuffer.getChunks().map(_.array())
+    assert(arrays.length === 1)
+    assert(arrays.head.length === ref.length)
+    assert(arrays.head.toSeq === ref.toSeq)
+  }
+
+  test("multiple chunk output") {
+    val ref = new Array[Byte](26)
+    Random.nextBytes(ref)
+    val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate)
+    o.write(ref)
+    val arrays = o.toChunkedByteBuffer.getChunks().map(_.array())
+    assert(arrays.length === 3)
+    assert(arrays(0).length === 10)
+    assert(arrays(1).length === 10)
+    assert(arrays(2).length === 6)
+
+    assert(arrays(0).toSeq === ref.slice(0, 10))
+    assert(arrays(1).toSeq === ref.slice(10, 20))
+    assert(arrays(2).toSeq === ref.slice(20, 26))
+  }
+
+  test("multiple chunk output at boundary size") {
+    val ref = new Array[Byte](30)
+    Random.nextBytes(ref)
+    val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate)
+    o.write(ref)
+    val arrays = o.toChunkedByteBuffer.getChunks().map(_.array())
+    assert(arrays.length === 3)
+    assert(arrays(0).length === 10)
+    assert(arrays(1).length === 10)
+    assert(arrays(2).length === 10)
+
+    assert(arrays(0).toSeq === ref.slice(0, 10))
+    assert(arrays(1).toSeq === ref.slice(10, 20))
+    assert(arrays(2).toSeq === ref.slice(20, 30))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index c56520b..53fccd8 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -162,7 +162,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
         logDebug(s"Stored partition data of $this into block manager with 
level $storageLevel")
         dataRead.rewind()
       }
-      serializerManager.dataDeserialize(blockId, new 
ChunkedByteBuffer(dataRead))
+      serializerManager
+        .dataDeserializeStream(blockId, new 
ChunkedByteBuffer(dataRead).toInputStream())
         .asInstanceOf[Iterator[T]]
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e41acb75/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 4e77cd6..5fc53bc 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -157,7 +157,8 @@ class ReceivedBlockHandlerSuite
           val reader = new 
FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
           val bytes = reader.read(fileSegment)
           reader.close()
-          serializerManager.dataDeserialize(generateBlockId(), new 
ChunkedByteBuffer(bytes)).toList
+          serializerManager.dataDeserializeStream(
+            generateBlockId(), new 
ChunkedByteBuffer(bytes).toInputStream()).toList
         }
         loggedData shouldEqual data
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to