Repository: spark
Updated Branches:
  refs/heads/master b3e5af62a -> b5f1ab701


[SPARK-13990] Automatically pick serializer when caching RDDs

Building on the `SerializerManager` introduced in SPARK-13926/ #11755, this 
patch Spark modifies Spark's BlockManager to use RDD's ClassTags in order to 
select the best serializer to use when caching RDD blocks.

When storing a local block, the BlockManager `put()` methods use implicits to 
record ClassTags and stores those tags in the blocks' BlockInfo records. When 
reading a local block, the stored ClassTag is used to pick the appropriate 
serializer. When a block is stored with replication, the class tag is written 
into the block transfer metadata and will also be stored in the remote 
BlockManager.

There are two or three places where we don't properly pass ClassTags, including 
TorrentBroadcast and BlockRDD. I think this happens to work because the missing 
ClassTag always happens to be `ClassTag.Any`, but it might be worth looking 
more carefully at those places to see whether we should be more explicit.

Author: Josh Rosen <[email protected]>

Closes #11801 from JoshRosen/pick-best-serializer-for-caching.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b5f1ab70
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b5f1ab70
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b5f1ab70

Branch: refs/heads/master
Commit: b5f1ab701a167a728bb006e01b392b203da84391
Parents: b3e5af6
Author: Josh Rosen <[email protected]>
Authored: Mon Mar 21 17:19:39 2016 -0700
Committer: Josh Rosen <[email protected]>
Committed: Mon Mar 21 17:19:39 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkEnv.scala  |   2 +-
 .../apache/spark/network/BlockDataManager.scala |   8 +-
 .../spark/network/BlockTransferService.scala    |  10 +-
 .../network/netty/NettyBlockRpcServer.scala     |  14 +-
 .../netty/NettyBlockTransferService.scala       |  12 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |   2 +-
 .../apache/spark/storage/BlockInfoManager.scala |   7 +-
 .../org/apache/spark/storage/BlockManager.scala | 146 ++++++++++---------
 .../spark/storage/memory/MemoryStore.scala      |  99 ++++++++-----
 .../org/apache/spark/DistributedSuite.scala     |   7 +-
 .../spark/storage/BlockInfoManagerSuite.scala   |   3 +-
 .../storage/BlockManagerReplicationSuite.scala  |   8 +-
 .../spark/storage/BlockManagerSuite.scala       |  49 ++++---
 project/MimaExcludes.scala                      |   3 +
 .../rdd/WriteAheadLogBackedBlockRDD.scala       |   3 +-
 .../streaming/ReceivedBlockHandlerSuite.scala   |   8 +-
 16 files changed, 226 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 459fab8..e2c47ce 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -331,7 +331,7 @@ object SparkEnv extends Logging {
 
     // NB: blockManager is not valid until initialize() is called later.
     val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
-      serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
+      serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
       blockTransferService, securityManager, numUsableCores)
 
     val broadcastManager = new BroadcastManager(isDriver, conf, 
securityManager)

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala 
b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
index cc5e851..8f83668 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.network
 
+import scala.reflect.ClassTag
+
 import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.storage.{BlockId, StorageLevel}
 
@@ -35,7 +37,11 @@ trait BlockDataManager {
    * Returns true if the block was stored and false if the put operation 
failed or the block
    * already existed.
    */
-  def putBlockData(blockId: BlockId, data: ManagedBuffer, level: 
StorageLevel): Boolean
+  def putBlockData(
+      blockId: BlockId,
+      data: ManagedBuffer,
+      level: StorageLevel,
+      classTag: ClassTag[_]): Boolean
 
   /**
    * Release locks acquired by [[putBlockData()]] and [[getBlockData()]].

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala 
b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index 2de0f20..e43e3a2 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
 
 import scala.concurrent.{Await, Future, Promise}
 import scala.concurrent.duration.Duration
+import scala.reflect.ClassTag
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
@@ -76,7 +77,8 @@ abstract class BlockTransferService extends ShuffleClient 
with Closeable with Lo
       execId: String,
       blockId: BlockId,
       blockData: ManagedBuffer,
-      level: StorageLevel): Future[Unit]
+      level: StorageLevel,
+      classTag: ClassTag[_]): Future[Unit]
 
   /**
    * A special case of [[fetchBlocks]], as it fetches only one block and is 
blocking.
@@ -114,7 +116,9 @@ abstract class BlockTransferService extends ShuffleClient 
with Closeable with Lo
       execId: String,
       blockId: BlockId,
       blockData: ManagedBuffer,
-      level: StorageLevel): Unit = {
-    Await.result(uploadBlock(hostname, port, execId, blockId, blockData, 
level), Duration.Inf)
+      level: StorageLevel,
+      classTag: ClassTag[_]): Unit = {
+    val future = uploadBlock(hostname, port, execId, blockId, blockData, 
level, classTag)
+    Await.result(future, Duration.Inf)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index c1dbca5..2ed8a00 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -20,6 +20,8 @@ package org.apache.spark.network.netty
 import java.nio.ByteBuffer
 
 import scala.collection.JavaConverters._
+import scala.language.existentials
+import scala.reflect.ClassTag
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.BlockDataManager
@@ -61,12 +63,16 @@ class NettyBlockRpcServer(
         responseContext.onSuccess(new StreamHandle(streamId, 
blocks.size).toByteBuffer)
 
       case uploadBlock: UploadBlock =>
-        // StorageLevel is serialized as bytes using our JavaSerializer.
-        val level: StorageLevel =
-          
serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
+        // StorageLevel and ClassTag are serialized as bytes using our 
JavaSerializer.
+        val (level: StorageLevel, classTag: ClassTag[_]) = {
+          serializer
+            .newInstance()
+            .deserialize(ByteBuffer.wrap(uploadBlock.metadata))
+            .asInstanceOf[(StorageLevel, ClassTag[_])]
+        }
         val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
         val blockId = BlockId(uploadBlock.blockId)
-        blockManager.putBlockData(blockId, data, level)
+        blockManager.putBlockData(blockId, data, level, classTag)
         responseContext.onSuccess(ByteBuffer.allocate(0))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index f588a28..5f3d453 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
 
 import scala.collection.JavaConverters._
 import scala.concurrent.{Future, Promise}
+import scala.reflect.ClassTag
 
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.network._
@@ -118,18 +119,19 @@ class NettyBlockTransferService(conf: SparkConf, 
securityManager: SecurityManage
       execId: String,
       blockId: BlockId,
       blockData: ManagedBuffer,
-      level: StorageLevel): Future[Unit] = {
+      level: StorageLevel,
+      classTag: ClassTag[_]): Future[Unit] = {
     val result = Promise[Unit]()
     val client = clientFactory.createClient(hostname, port)
 
-    // StorageLevel is serialized as bytes using our JavaSerializer. 
Everything else is encoded
-    // using our binary protocol.
-    val levelBytes = 
JavaUtils.bufferToArray(serializer.newInstance().serialize(level))
+    // StorageLevel and ClassTag are serialized as bytes using our 
JavaSerializer.
+    // Everything else is encoded using our binary protocol.
+    val metadata = 
JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))
 
     // Convert or copy nio buffer into array in order to serialize it.
     val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())
 
-    client.sendRpc(new UploadBlock(appId, execId, blockId.toString, 
levelBytes, array).toByteBuffer,
+    client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, 
array).toByteBuffer,
       new RpcResponseCallback {
         override def onSuccess(response: ByteBuffer): Unit = {
           logTrace(s"Successfully uploaded block $blockId")

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 8a577c8..f96551c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -326,7 +326,7 @@ abstract class RDD[T: ClassTag](
     val blockId = RDDBlockId(id, partition.index)
     var readCachedBlock = true
     // This method is called on executors, so we need call SparkEnv.get 
instead of sc.env.
-    SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, () => {
+    SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, 
elementClassTag, () => {
       readCachedBlock = false
       computeOrReadCheckpoint(partition, context)
     }) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 46fab7a..94d11c5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -21,6 +21,7 @@ import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.reflect.ClassTag
 
 import com.google.common.collect.ConcurrentHashMultiset
 
@@ -37,10 +38,14 @@ import org.apache.spark.internal.Logging
  * @param level the block's storage level. This is the requested persistence 
level, not the
  *              effective storage level of the block (i.e. if this is 
MEMORY_AND_DISK, then this
  *              does not imply that the block is actually resident in memory).
+ * @param classTag the block's [[ClassTag]], used to select the serializer
  * @param tellMaster whether state changes for this block should be reported 
to the master. This
  *                   is true for most blocks, but is false for broadcast 
blocks.
  */
-private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: 
Boolean) {
+private[storage] class BlockInfo(
+    val level: StorageLevel,
+    val classTag: ClassTag[_],
+    val tellMaster: Boolean) {
 
   /**
    * The size of the block (in bytes)

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index aa2561d..83f8c5c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
+import scala.reflect.ClassTag
 import scala.util.Random
 import scala.util.control.NonFatal
 
@@ -37,7 +38,7 @@ import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.ExternalShuffleClient
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
 import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.serializer.{Serializer, SerializerInstance}
+import org.apache.spark.serializer.{Serializer, SerializerInstance, 
SerializerManager}
 import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.storage.memory._
 import org.apache.spark.util._
@@ -59,7 +60,7 @@ private[spark] class BlockManager(
     executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
-    defaultSerializer: Serializer,
+    serializerManager: SerializerManager,
     val conf: SparkConf,
     memoryManager: MemoryManager,
     mapOutputTracker: MapOutputTracker,
@@ -294,8 +295,12 @@ private[spark] class BlockManager(
   /**
    * Put the block locally, using the given storage level.
    */
-  override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: 
StorageLevel): Boolean = {
-    putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)
+  override def putBlockData(
+      blockId: BlockId,
+      data: ManagedBuffer,
+      level: StorageLevel,
+      classTag: ClassTag[_]): Boolean = {
+    putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), 
level)(classTag)
   }
 
   /**
@@ -417,7 +422,7 @@ private[spark] class BlockManager(
           val iter: Iterator[Any] = if (level.deserialized) {
             memoryStore.getValues(blockId).get
           } else {
-            dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
+            dataDeserialize(blockId, 
memoryStore.getBytes(blockId).get)(info.classTag)
           }
           val ci = CompletionIterator[Any, Iterator[Any]](iter, 
releaseLock(blockId))
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
@@ -425,10 +430,11 @@ private[spark] class BlockManager(
           val iterToReturn: Iterator[Any] = {
             val diskBytes = diskStore.getBytes(blockId)
             if (level.deserialized) {
-              val diskValues = dataDeserialize(blockId, diskBytes)
+              val diskValues = dataDeserialize(blockId, 
diskBytes)(info.classTag)
               maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
             } else {
-              dataDeserialize(blockId, maybeCacheDiskBytesInMemory(info, 
blockId, level, diskBytes))
+              val bytes = maybeCacheDiskBytesInMemory(info, blockId, level, 
diskBytes)
+              dataDeserialize(blockId, bytes)(info.classTag)
             }
           }
           val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, 
releaseLock(blockId))
@@ -502,7 +508,7 @@ private[spark] class BlockManager(
    *
    * This does not acquire a lock on this block in this JVM.
    */
-  def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+  private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
     getRemoteBytes(blockId).map { data =>
       new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, 
data.size)
     }
@@ -633,12 +639,13 @@ private[spark] class BlockManager(
    * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
    *         could not be cached.
    */
-  def getOrElseUpdate(
+  def getOrElseUpdate[T](
       blockId: BlockId,
       level: StorageLevel,
-      makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = 
{
+      classTag: ClassTag[T],
+      makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
     // Initially we hold no locks on this block.
-    doPutIterator(blockId, makeIterator, level, keepReadLock = true) match {
+    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) 
match {
       case None =>
         // doPut() didn't hand work back to us, so the block already existed 
or was successfully
         // stored. Therefore, we now hold a read lock on the block.
@@ -664,13 +671,13 @@ private[spark] class BlockManager(
   /**
    * @return true if the block was stored or false if an error occurred.
    */
-  def putIterator(
+  def putIterator[T: ClassTag](
       blockId: BlockId,
-      values: Iterator[Any],
+      values: Iterator[T],
       level: StorageLevel,
       tellMaster: Boolean = true): Boolean = {
     require(values != null, "Values is null")
-    doPutIterator(blockId, () => values, level, tellMaster) match {
+    doPutIterator(blockId, () => values, level, implicitly[ClassTag[T]], 
tellMaster) match {
       case None =>
         true
       case Some(iter) =>
@@ -703,13 +710,13 @@ private[spark] class BlockManager(
    *
    * @return true if the block was stored or false if an error occurred.
    */
-  def putBytes(
+  def putBytes[T: ClassTag](
       blockId: BlockId,
       bytes: ChunkedByteBuffer,
       level: StorageLevel,
       tellMaster: Boolean = true): Boolean = {
     require(bytes != null, "Bytes is null")
-    doPutBytes(blockId, bytes, level, tellMaster)
+    doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster)
   }
 
   /**
@@ -723,13 +730,14 @@ private[spark] class BlockManager(
    *                     returns.
    * @return true if the block was already present or if the put succeeded, 
false otherwise.
    */
-  private def doPutBytes(
+  private def doPutBytes[T](
       blockId: BlockId,
       bytes: ChunkedByteBuffer,
       level: StorageLevel,
+      classTag: ClassTag[T],
       tellMaster: Boolean = true,
       keepReadLock: Boolean = false): Boolean = {
-    doPut(blockId, level, tellMaster = tellMaster, keepReadLock = 
keepReadLock) { putBlockInfo =>
+    doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = 
keepReadLock) { info =>
       val startTimeMs = System.currentTimeMillis
       // Since we're storing bytes, initiate the replication before storing 
them locally.
       // This is faster as data is already serialized and ready to send.
@@ -737,7 +745,7 @@ private[spark] class BlockManager(
         Future {
           // This is a blocking action and should run in 
futureExecutionContext which is a cached
           // thread pool
-          replicate(blockId, bytes, level)
+          replicate(blockId, bytes, level, classTag)
         }(futureExecutionContext)
       } else {
         null
@@ -749,8 +757,8 @@ private[spark] class BlockManager(
         // Put it in memory first, even if it also has useDisk set to true;
         // We will drop it to disk later if the memory store can't hold it.
         val putSucceeded = if (level.deserialized) {
-          val values = dataDeserialize(blockId, bytes)
-          memoryStore.putIterator(blockId, values, level) match {
+          val values = dataDeserialize(blockId, bytes)(classTag)
+          memoryStore.putIterator(blockId, values, level, classTag) match {
             case Right(_) => true
             case Left(iter) =>
               // If putting deserialized values in memory failed, we will put 
the bytes directly to
@@ -769,14 +777,14 @@ private[spark] class BlockManager(
         diskStore.putBytes(blockId, bytes)
       }
 
-      val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+      val putBlockStatus = getCurrentBlockStatus(blockId, info)
       val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
       if (blockWasSuccessfullyStored) {
         // Now that the block is in either the memory, externalBlockStore, or 
disk store,
         // tell the master about it.
-        putBlockInfo.size = size
+        info.size = size
         if (tellMaster) {
-          reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+          reportBlockStatus(blockId, info, putBlockStatus)
         }
         Option(TaskContext.get()).foreach { c =>
           c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, 
putBlockStatus)))
@@ -804,6 +812,7 @@ private[spark] class BlockManager(
   private def doPut[T](
       blockId: BlockId,
       level: StorageLevel,
+      classTag: ClassTag[_],
       tellMaster: Boolean,
       keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = {
 
@@ -811,7 +820,7 @@ private[spark] class BlockManager(
     require(level != null && level.isValid, "StorageLevel is null or invalid")
 
     val putBlockInfo = {
-      val newInfo = new BlockInfo(level, tellMaster)
+      val newInfo = new BlockInfo(level, classTag, tellMaster)
       if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
         newInfo
       } else {
@@ -864,21 +873,22 @@ private[spark] class BlockManager(
    * @return None if the block was already present or if the put succeeded, or 
Some(iterator)
    *         if the put failed.
    */
-  private def doPutIterator(
+  private def doPutIterator[T](
       blockId: BlockId,
-      iterator: () => Iterator[Any],
+      iterator: () => Iterator[T],
       level: StorageLevel,
+      classTag: ClassTag[T],
       tellMaster: Boolean = true,
-      keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator] = {
-    doPut(blockId, level, tellMaster = tellMaster, keepReadLock = 
keepReadLock) { putBlockInfo =>
+      keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
+    doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = 
keepReadLock) { info =>
       val startTimeMs = System.currentTimeMillis
-      var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator] 
= None
+      var iteratorFromFailedMemoryStorePut: 
Option[PartiallyUnrolledIterator[T]] = None
       // Size of the block in bytes
       var size = 0L
       if (level.useMemory) {
         // Put it in memory first, even if it also has useDisk set to true;
         // We will drop it to disk later if the memory store can't hold it.
-        memoryStore.putIterator(blockId, iterator(), level) match {
+        memoryStore.putIterator(blockId, iterator(), level, classTag) match {
           case Right(s) =>
             size = s
           case Left(iter) =>
@@ -886,7 +896,7 @@ private[spark] class BlockManager(
             if (level.useDisk) {
               logWarning(s"Persisting block $blockId to disk instead.")
               diskStore.put(blockId) { fileOutputStream =>
-                dataSerializeStream(blockId, fileOutputStream, iter)
+                dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
               }
               size = diskStore.getSize(blockId)
             } else {
@@ -895,19 +905,19 @@ private[spark] class BlockManager(
         }
       } else if (level.useDisk) {
         diskStore.put(blockId) { fileOutputStream =>
-          dataSerializeStream(blockId, fileOutputStream, iterator())
+          dataSerializeStream(blockId, fileOutputStream, iterator())(classTag)
         }
         size = diskStore.getSize(blockId)
       }
 
-      val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+      val putBlockStatus = getCurrentBlockStatus(blockId, info)
       val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
       if (blockWasSuccessfullyStored) {
         // Now that the block is in either the memory, externalBlockStore, or 
disk store,
         // tell the master about it.
-        putBlockInfo.size = size
+        info.size = size
         if (tellMaster) {
-          reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+          reportBlockStatus(blockId, info, putBlockStatus)
         }
         Option(TaskContext.get()).foreach { c =>
           c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, 
putBlockStatus)))
@@ -915,9 +925,9 @@ private[spark] class BlockManager(
         logDebug("Put block %s locally took %s".format(blockId, 
Utils.getUsedTimeMs(startTimeMs)))
         if (level.replication > 1) {
           val remoteStartTime = System.currentTimeMillis
-          val bytesToReplicate = doGetLocalBytes(blockId, putBlockInfo)
+          val bytesToReplicate = doGetLocalBytes(blockId, info)
           try {
-            replicate(blockId, bytesToReplicate, level)
+            replicate(blockId, bytesToReplicate, level, classTag)
           } finally {
             bytesToReplicate.dispose()
           }
@@ -978,12 +988,13 @@ private[spark] class BlockManager(
    * @return a copy of the iterator. The original iterator passed this method 
should no longer
    *         be used after this method returns.
    */
-  private def maybeCacheDiskValuesInMemory(
+  private def maybeCacheDiskValuesInMemory[T](
       blockInfo: BlockInfo,
       blockId: BlockId,
       level: StorageLevel,
-      diskIterator: Iterator[Any]): Iterator[Any] = {
+      diskIterator: Iterator[T]): Iterator[T] = {
     require(level.deserialized)
+    val classTag = blockInfo.classTag.asInstanceOf[ClassTag[T]]
     if (level.useMemory) {
       // Synchronize on blockInfo to guard against a race condition where two 
readers both try to
       // put values read from disk into the MemoryStore.
@@ -992,7 +1003,7 @@ private[spark] class BlockManager(
           // Note: if we had a means to discard the disk iterator, we would do 
that here.
           memoryStore.getValues(blockId).get
         } else {
-          memoryStore.putIterator(blockId, diskIterator, level) match {
+          memoryStore.putIterator(blockId, diskIterator, level, classTag) 
match {
             case Left(iter) =>
               // The memory store put() failed, so it returned the iterator 
back to us:
               iter
@@ -1001,7 +1012,7 @@ private[spark] class BlockManager(
               memoryStore.getValues(blockId).get
           }
         }
-      }
+      }.asInstanceOf[Iterator[T]]
     } else {
       diskIterator
     }
@@ -1027,7 +1038,11 @@ private[spark] class BlockManager(
    * Replicate block to another node. Not that this is a blocking call that 
returns after
    * the block has been replicated.
    */
-  private def replicate(blockId: BlockId, data: ChunkedByteBuffer, level: 
StorageLevel): Unit = {
+  private def replicate(
+      blockId: BlockId,
+      data: ChunkedByteBuffer,
+      level: StorageLevel,
+      classTag: ClassTag[_]): Unit = {
     val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
     val numPeersToReplicateTo = level.replication - 1
     val peersForReplication = new ArrayBuffer[BlockManagerId]
@@ -1087,7 +1102,8 @@ private[spark] class BlockManager(
               peer.executorId,
               blockId,
               new NettyManagedBuffer(data.toNetty),
-              tLevel)
+              tLevel,
+              classTag)
             logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in 
%s ms"
               .format(System.currentTimeMillis - onePeerStartTime))
             peersReplicatedTo += peer
@@ -1132,9 +1148,9 @@ private[spark] class BlockManager(
    * @return true if the block was stored or false if the block was already 
stored or an
    *         error occurred.
    */
-  def putSingle(
+  def putSingle[T: ClassTag](
       blockId: BlockId,
-      value: Any,
+      value: T,
       level: StorageLevel,
       tellMaster: Boolean = true): Boolean = {
     putIterator(blockId, Iterator(value), level, tellMaster)
@@ -1151,9 +1167,9 @@ private[spark] class BlockManager(
    *
    * @return the block's new effective StorageLevel.
    */
-  def dropFromMemory(
+  private[storage] def dropFromMemory[T: ClassTag](
       blockId: BlockId,
-      data: () => Either[Array[Any], ChunkedByteBuffer]): StorageLevel = {
+      data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = {
     logInfo(s"Dropping block $blockId from memory")
     val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
     var blockIsUpdated = false
@@ -1165,7 +1181,10 @@ private[spark] class BlockManager(
       data() match {
         case Left(elements) =>
           diskStore.put(blockId) { fileOutputStream =>
-            dataSerializeStream(blockId, fileOutputStream, elements.toIterator)
+            dataSerializeStream(
+              blockId,
+              fileOutputStream,
+              elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]])
           }
         case Right(bytes) =>
           diskStore.putBytes(blockId, bytes)
@@ -1271,17 +1290,17 @@ private[spark] class BlockManager(
   }
 
   /** Serializes into a stream. */
-  def dataSerializeStream(
+  def dataSerializeStream[T: ClassTag](
       blockId: BlockId,
       outputStream: OutputStream,
-      values: Iterator[Any]): Unit = {
+      values: Iterator[T]): Unit = {
     val byteStream = new BufferedOutputStream(outputStream)
-    val ser = defaultSerializer.newInstance()
+    val ser = 
serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
     ser.serializeStream(wrapForCompression(blockId, 
byteStream)).writeAll(values).close()
   }
 
   /** Serializes into a chunked byte buffer. */
-  def dataSerialize(blockId: BlockId, values: Iterator[Any]): 
ChunkedByteBuffer = {
+  def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): 
ChunkedByteBuffer = {
     val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 
1024 * 4)
     dataSerializeStream(blockId, byteArrayChunkOutputStream, values)
     new 
ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap))
@@ -1291,29 +1310,22 @@ private[spark] class BlockManager(
    * Deserializes a ByteBuffer into an iterator of values and disposes of it 
when the end of
    * the iterator is reached.
    */
-  def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = {
-    bytes.rewind()
-    dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
-  }
-
-  /**
-   * Deserializes a ByteBuffer into an iterator of values and disposes of it 
when the end of
-   * the iterator is reached.
-   */
-  def dataDeserialize(blockId: BlockId, bytes: ChunkedByteBuffer): 
Iterator[Any] = {
-    dataDeserializeStream(blockId, bytes.toInputStream(dispose = true))
+  def dataDeserialize[T: ClassTag](blockId: BlockId, bytes: 
ChunkedByteBuffer): Iterator[T] = {
+    dataDeserializeStream[T](blockId, bytes.toInputStream(dispose = true))
   }
 
   /**
    * Deserializes a InputStream into an iterator of values and disposes of it 
when the end of
    * the iterator is reached.
    */
-  def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): 
Iterator[Any] = {
+  def dataDeserializeStream[T: ClassTag](
+      blockId: BlockId,
+      inputStream: InputStream): Iterator[T] = {
     val stream = new BufferedInputStream(inputStream)
-    defaultSerializer
+    serializerManager.getSerializer(implicitly[ClassTag[T]])
       .newInstance()
       .deserializeStream(wrapForCompression(blockId, stream))
-      .asIterator
+      .asIterator.asInstanceOf[Iterator[T]]
   }
 
   def stop(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 9417132..d370ee9 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -21,6 +21,7 @@ import java.util.LinkedHashMap
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
 
 import org.apache.spark.{SparkConf, TaskContext}
 import org.apache.spark.internal.Logging
@@ -30,11 +31,18 @@ import org.apache.spark.util.{CompletionIterator, 
SizeEstimator, Utils}
 import org.apache.spark.util.collection.SizeTrackingVector
 import org.apache.spark.util.io.ChunkedByteBuffer
 
-private sealed trait MemoryEntry {
-  val size: Long
+private sealed trait MemoryEntry[T] {
+  def size: Long
+  def classTag: ClassTag[T]
 }
-private case class DeserializedMemoryEntry(value: Array[Any], size: Long) 
extends MemoryEntry
-private case class SerializedMemoryEntry(buffer: ChunkedByteBuffer, size: 
Long) extends MemoryEntry
+private case class DeserializedMemoryEntry[T](
+    value: Array[T],
+    size: Long,
+    classTag: ClassTag[T]) extends MemoryEntry[T]
+private case class SerializedMemoryEntry[T](
+    buffer: ChunkedByteBuffer,
+    size: Long,
+    classTag: ClassTag[T]) extends MemoryEntry[T]
 
 /**
  * Stores blocks in memory, either as Arrays of deserialized Java objects or as
@@ -49,7 +57,7 @@ private[spark] class MemoryStore(
   // Note: all changes to memory allocations, notably putting blocks, evicting 
blocks, and
   // acquiring or releasing unroll memory, must be synchronized on 
`memoryManager`!
 
-  private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, 
true)
+  private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, 
true)
 
   // A mapping from taskAttemptId to amount of memory used for unrolling a 
block (in bytes)
   // All accesses of this map are assumed to have manually synchronized on 
`memoryManager`
@@ -95,13 +103,16 @@ private[spark] class MemoryStore(
    *
    * @return true if the put() succeeded, false otherwise.
    */
-  def putBytes(blockId: BlockId, size: Long, _bytes: () => ChunkedByteBuffer): 
Boolean = {
+  def putBytes[T: ClassTag](
+      blockId: BlockId,
+      size: Long,
+      _bytes: () => ChunkedByteBuffer): Boolean = {
     require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
     if (memoryManager.acquireStorageMemory(blockId, size)) {
       // We acquired enough memory for the block, so go ahead and put it
       val bytes = _bytes()
       assert(bytes.size == size)
-      val entry = new SerializedMemoryEntry(bytes, size)
+      val entry = new SerializedMemoryEntry[T](bytes, size, 
implicitly[ClassTag[T]])
       entries.synchronized {
         entries.put(blockId, entry)
       }
@@ -129,10 +140,11 @@ private[spark] class MemoryStore(
    *         iterator or call `close()` on it in order to free the storage 
memory consumed by the
    *         partially-unrolled block.
    */
-  private[storage] def putIterator(
+  private[storage] def putIterator[T](
       blockId: BlockId,
-      values: Iterator[Any],
-      level: StorageLevel): Either[PartiallyUnrolledIterator, Long] = {
+      values: Iterator[T],
+      level: StorageLevel,
+      classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
 
     require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
 
@@ -151,7 +163,7 @@ private[spark] class MemoryStore(
     // Keep track of unroll memory used by this particular block / 
putIterator() operation
     var unrollMemoryUsedByThisBlock = 0L
     // Underlying vector for unrolling the block
-    var vector = new SizeTrackingVector[Any]
+    var vector = new SizeTrackingVector[T]()(classTag)
 
     // Request enough memory to begin unrolling
     keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold)
@@ -187,10 +199,10 @@ private[spark] class MemoryStore(
       val arrayValues = vector.toArray
       vector = null
       val entry = if (level.deserialized) {
-        new DeserializedMemoryEntry(arrayValues, 
SizeEstimator.estimate(arrayValues))
+        new DeserializedMemoryEntry[T](arrayValues, 
SizeEstimator.estimate(arrayValues), classTag)
       } else {
-        val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
-        new SerializedMemoryEntry(bytes, bytes.size)
+        val bytes = blockManager.dataSerialize(blockId, 
arrayValues.iterator)(classTag)
+        new SerializedMemoryEntry[T](bytes, bytes.size, classTag)
       }
       val size = entry.size
       def transferUnrollToStorage(amount: Long): Unit = {
@@ -248,19 +260,21 @@ private[spark] class MemoryStore(
     val entry = entries.synchronized { entries.get(blockId) }
     entry match {
       case null => None
-      case e: DeserializedMemoryEntry =>
+      case e: DeserializedMemoryEntry[_] =>
         throw new IllegalArgumentException("should only call getBytes on 
serialized blocks")
-      case SerializedMemoryEntry(bytes, _) => Some(bytes)
+      case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
     }
   }
 
-  def getValues(blockId: BlockId): Option[Iterator[Any]] = {
+  def getValues(blockId: BlockId): Option[Iterator[_]] = {
     val entry = entries.synchronized { entries.get(blockId) }
     entry match {
       case null => None
-      case e: SerializedMemoryEntry =>
+      case e: SerializedMemoryEntry[_] =>
         throw new IllegalArgumentException("should only call getValues on 
deserialized blocks")
-      case DeserializedMemoryEntry(values, _) => Some(values.iterator)
+      case DeserializedMemoryEntry(values, _, _) =>
+        val x = Some(values)
+        x.map(_.iterator)
     }
   }
 
@@ -333,6 +347,24 @@ private[spark] class MemoryStore(
         }
       }
 
+      def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
+        val data = entry match {
+          case DeserializedMemoryEntry(values, _, _) => Left(values)
+          case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
+        }
+        val newEffectiveStorageLevel =
+          blockManager.dropFromMemory(blockId, () => data)(entry.classTag)
+        if (newEffectiveStorageLevel.isValid) {
+          // The block is still present in at least one store, so release the 
lock
+          // but don't delete the block info
+          blockManager.releaseLock(blockId)
+        } else {
+          // The block isn't present in any store, so delete the block info so 
that the
+          // block can be stored again
+          blockManager.blockInfoManager.removeBlock(blockId)
+        }
+      }
+
       if (freedMemory >= space) {
         logInfo(s"${selectedBlocks.size} blocks selected for dropping")
         for (blockId <- selectedBlocks) {
@@ -341,20 +373,7 @@ private[spark] class MemoryStore(
           // blocks and removing entries. However the check is still here for
           // future safety.
           if (entry != null) {
-            val data = entry match {
-              case DeserializedMemoryEntry(values, _) => Left(values)
-              case SerializedMemoryEntry(buffer, _) => Right(buffer)
-            }
-            val newEffectiveStorageLevel = 
blockManager.dropFromMemory(blockId, () => data)
-            if (newEffectiveStorageLevel.isValid) {
-              // The block is still present in at least one store, so release 
the lock
-              // but don't delete the block info
-              blockManager.releaseLock(blockId)
-            } else {
-              // The block isn't present in any store, so delete the block 
info so that the
-              // block can be stored again
-              blockManager.blockInfoManager.removeBlock(blockId)
-            }
+            dropBlock(blockId, entry)
           }
         }
         freedMemory
@@ -470,16 +489,16 @@ private[spark] class MemoryStore(
  * @param unrolled an iterator for the partially-unrolled values.
  * @param rest the rest of the original iterator passed to 
[[MemoryStore.putIterator()]].
  */
-private[storage] class PartiallyUnrolledIterator(
+private[storage] class PartiallyUnrolledIterator[T](
     memoryStore: MemoryStore,
     unrollMemory: Long,
-    unrolled: Iterator[Any],
-    rest: Iterator[Any])
-  extends Iterator[Any] {
+    unrolled: Iterator[T],
+    rest: Iterator[T])
+  extends Iterator[T] {
 
   private[this] var unrolledIteratorIsConsumed: Boolean = false
-  private[this] var iter: Iterator[Any] = {
-    val completionIterator = CompletionIterator[Any, Iterator[Any]](unrolled, {
+  private[this] var iter: Iterator[T] = {
+    val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, {
       unrolledIteratorIsConsumed = true
       memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
     })
@@ -487,7 +506,7 @@ private[storage] class PartiallyUnrolledIterator(
   }
 
   override def hasNext: Boolean = iter.hasNext
-  override def next(): Any = iter.next()
+  override def next(): T = iter.next()
 
   /**
    * Called to dispose of this iterator and free its memory.

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 1c3f2bc..2732cd6 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.Matchers
 import org.scalatest.time.{Millis, Span}
 
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 class NotSerializableClass
 class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() 
{}
@@ -196,8 +197,8 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
     blockManager.master.getLocations(blockId).foreach { cmId =>
       val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, 
cmId.executorId,
         blockId.toString)
-      val deserialized = blockManager.dataDeserialize(blockId, 
bytes.nioByteBuffer())
-        .asInstanceOf[Iterator[Int]].toList
+      val deserialized = blockManager.dataDeserialize[Int](blockId,
+        new ChunkedByteBuffer(bytes.nioByteBuffer())).toList
       assert(deserialized === (1 to 100).toList)
     }
   }
@@ -222,7 +223,7 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
     val numPartitions = 10
     val conf = new SparkConf()
       .set("spark.storage.unrollMemoryThreshold", "1024")
-      .set("spark.testing.memory", (size * numPartitions).toString)
+      .set("spark.testing.memory", size.toString)
     sc = new SparkContext(clusterUrl, "test", conf)
     val data = sc.parallelize(1 to size, 
numPartitions).persist(StorageLevel.MEMORY_ONLY)
     assert(data.count() === size)

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index fe83fc7..7ee76aa 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
 
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.language.implicitConversions
+import scala.reflect.ClassTag
 
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.time.SpanSugar._
@@ -52,7 +53,7 @@ class BlockInfoManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach {
   }
 
   private def newBlockInfo(): BlockInfo = {
-    new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false)
+    new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false)
   }
 
   private def withTaskId[T](taskAttemptId: Long)(block: => T): T = {

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index b78a364..98e8450 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.network.BlockTransferService
 import org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
-import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
 import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.storage.StorageLevel._
 
@@ -62,7 +62,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with 
Matchers with Befo
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 
1)
     val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, 
numCores = 1)
-    val store = new BlockManager(name, rpcEnv, master, serializer, conf,
+    val serializerManager = new SerializerManager(serializer, conf)
+    val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,
       memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
     memManager.setMemoryStore(store.memoryStore)
     store.initialize("app-id")
@@ -262,7 +263,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite 
with Matchers with Befo
     when(failableTransfer.hostName).thenReturn("some-hostname")
     when(failableTransfer.port).thenReturn(1000)
     val memManager = new StaticMemoryManager(conf, Long.MaxValue, 10000, 
numCores = 1)
-    val failableStore = new BlockManager("failable-store", rpcEnv, master, 
serializer, conf,
+    val serializerManager = new SerializerManager(serializer, conf)
+    val failableStore = new BlockManager("failable-store", rpcEnv, master, 
serializerManager, conf,
       memManager, mapOutputTracker, shuffleManager, failableTransfer, 
securityMgr, 0)
     memManager.setMemoryStore(failableStore.memoryStore)
     failableStore.initialize("app-id")

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index edf5cd3..9419dfa 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -24,6 +24,7 @@ import scala.concurrent.duration._
 import scala.concurrent.Future
 import scala.language.implicitConversions
 import scala.language.postfixOps
+import scala.reflect.ClassTag
 
 import org.mockito.{Matchers => mc}
 import org.mockito.Mockito.{mock, times, verify, when}
@@ -40,7 +41,7 @@ import 
org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.network.shuffle.BlockFetchingListener
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
-import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, 
SerializerManager}
 import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
 import org.apache.spark.util._
@@ -77,7 +78,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     val transfer = transferService
       .getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 
1))
     val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, 
numCores = 1)
-    val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf,
+    val serializerManager = new SerializerManager(serializer, conf)
+    val blockManager = new BlockManager(name, rpcEnv, master, 
serializerManager, conf,
       memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
     memManager.setMemoryStore(blockManager.memoryStore)
     blockManager.initialize("app-id")
@@ -821,8 +823,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
       maxOnHeapExecutionMemory = Long.MaxValue,
       maxStorageMemory = 1200,
       numCores = 1)
+    val serializerManager = new SerializerManager(new JavaSerializer(conf), 
conf)
     store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
-      new JavaSerializer(conf), conf, memoryManager, mapOutputTracker,
+      serializerManager, conf, memoryManager, mapOutputTracker,
       shuffleManager, transfer, securityMgr, 0)
     memoryManager.setMemoryStore(store.memoryStore)
 
@@ -1074,7 +1077,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll with all the space in the world. This should succeed.
-    var putResult = memoryStore.putIterator("unroll", smallList.iterator, 
StorageLevel.MEMORY_ONLY)
+    var putResult =
+      memoryStore.putIterator("unroll", smallList.iterator, 
StorageLevel.MEMORY_ONLY, ClassTag.Any)
     assert(putResult.isRight)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case 
(e, a) =>
@@ -1085,7 +1089,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     // Unroll with not enough space. This should succeed after kicking out 
someBlock1.
     assert(store.putIterator("someBlock1", smallList.iterator, 
StorageLevel.MEMORY_ONLY))
     assert(store.putIterator("someBlock2", smallList.iterator, 
StorageLevel.MEMORY_ONLY))
-    putResult = memoryStore.putIterator("unroll", smallList.iterator, 
StorageLevel.MEMORY_ONLY)
+    putResult =
+      memoryStore.putIterator("unroll", smallList.iterator, 
StorageLevel.MEMORY_ONLY, ClassTag.Any)
     assert(putResult.isRight)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     assert(memoryStore.contains("someBlock2"))
@@ -1099,7 +1104,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     // 4800 bytes, there is still not enough room to unroll this block. This 
returns an iterator.
     // In the mean time, however, we kicked out someBlock2 before giving up.
     assert(store.putIterator("someBlock3", smallList.iterator, 
StorageLevel.MEMORY_ONLY))
-    putResult = memoryStore.putIterator("unroll", bigList.iterator, 
StorageLevel.MEMORY_ONLY)
+    putResult =
+      memoryStore.putIterator("unroll", bigList.iterator, 
StorageLevel.MEMORY_ONLY, ClassTag.Any)
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an 
iterator
     assert(!memoryStore.contains("someBlock2"))
     assert(putResult.isLeft)
@@ -1121,8 +1127,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll with plenty of space. This should succeed and cache both blocks.
-    val result1 = memoryStore.putIterator("b1", smallIterator, memOnly)
-    val result2 = memoryStore.putIterator("b2", smallIterator, memOnly)
+    val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, 
ClassTag.Any)
+    val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, 
ClassTag.Any)
     assert(memoryStore.contains("b1"))
     assert(memoryStore.contains("b2"))
     assert(result1.isRight) // unroll was successful
@@ -1137,7 +1143,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     store.putIterator("b2", smallIterator, memOnly)
 
     // Unroll with not enough space. This should succeed but kick out b1 in 
the process.
-    val result3 = memoryStore.putIterator("b3", smallIterator, memOnly)
+    val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, 
ClassTag.Any)
     assert(result3.isRight)
     assert(!memoryStore.contains("b1"))
     assert(memoryStore.contains("b2"))
@@ -1147,7 +1153,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     store.putIterator("b3", smallIterator, memOnly)
 
     // Unroll huge block with not enough space. This should fail and kick out 
b2 in the process.
-    val result4 = memoryStore.putIterator("b4", bigIterator, memOnly)
+    val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, 
ClassTag.Any)
     assert(result4.isLeft) // unroll was unsuccessful
     assert(!memoryStore.contains("b1"))
     assert(!memoryStore.contains("b2"))
@@ -1175,7 +1181,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 
     // Unroll with not enough space. This should succeed but kick out b1 in 
the process.
     // Memory store should contain b2 and b3, while disk store should contain 
only b1
-    val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk)
+    val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, 
ClassTag.Any)
     assert(result3.isRight)
     assert(!memoryStore.contains("b1"))
     assert(memoryStore.contains("b2"))
@@ -1191,7 +1197,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     // the block may be stored to disk. During the unrolling process, block 
"b2" should be kicked
     // out, so the memory store should contain only b3, while the disk store 
should contain
     // b1, b2 and b4.
-    val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk)
+    val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, 
ClassTag.Any)
     assert(result4.isLeft)
     assert(!memoryStore.contains("b1"))
     assert(!memoryStore.contains("b2"))
@@ -1211,28 +1217,28 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // All unroll memory used is released because putIterator did not return 
an iterator
-    assert(memoryStore.putIterator("b1", smallIterator, memOnly).isRight)
+    assert(memoryStore.putIterator("b1", smallIterator, memOnly, 
ClassTag.Any).isRight)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
-    assert(memoryStore.putIterator("b2", smallIterator, memOnly).isRight)
+    assert(memoryStore.putIterator("b2", smallIterator, memOnly, 
ClassTag.Any).isRight)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll memory is not released because putIterator returned an iterator
     // that still depends on the underlying vector used in the process
-    assert(memoryStore.putIterator("b3", smallIterator, memOnly).isLeft)
+    assert(memoryStore.putIterator("b3", smallIterator, memOnly, 
ClassTag.Any).isLeft)
     val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB3 > 0)
 
     // The unroll memory owned by this thread builds on top of its value after 
the previous unrolls
-    assert(memoryStore.putIterator("b4", smallIterator, memOnly).isLeft)
+    assert(memoryStore.putIterator("b4", smallIterator, memOnly, 
ClassTag.Any).isLeft)
     val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
 
     // ... but only to a certain extent (until we run out of free space to 
grant new unroll memory)
-    assert(memoryStore.putIterator("b5", smallIterator, memOnly).isLeft)
+    assert(memoryStore.putIterator("b5", smallIterator, memOnly, 
ClassTag.Any).isLeft)
     val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
-    assert(memoryStore.putIterator("b6", smallIterator, memOnly).isLeft)
+    assert(memoryStore.putIterator("b6", smallIterator, memOnly, 
ClassTag.Any).isLeft)
     val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
-    assert(memoryStore.putIterator("b7", smallIterator, memOnly).isLeft)
+    assert(memoryStore.putIterator("b7", smallIterator, memOnly, 
ClassTag.Any).isLeft)
     val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
     assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
@@ -1244,7 +1250,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val memoryStore = store.memoryStore
     val blockId = BlockId("rdd_3_10")
     store.blockInfoManager.lockNewBlockForWriting(
-      blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false))
+      blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, 
tellMaster = false))
     memoryStore.putBytes(blockId, 13000, () => {
       fail("A big ByteBuffer that cannot be put into MemoryStore should not be 
created")
     })
@@ -1340,7 +1346,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
         port: Int, execId: String,
         blockId: BlockId,
         blockData: ManagedBuffer,
-        level: StorageLevel): Future[Unit] = {
+        level: StorageLevel,
+        classTag: ClassTag[_]): Future[Unit] = {
       import scala.concurrent.ExecutionContext.Implicits.global
       Future {}
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 13caa54..68e9c50 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -569,6 +569,9 @@ object MimaExcludes {
             if 
missing.map(_.fullName).sameElements(Seq("org.apache.spark.Logging")) => false
           case _ => true
         }
+      ) ++ Seq(
+        // [SPARK-13990] Automatically pick serializer when caching RDDs
+        
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockTransferService.uploadBlock")
       )
     case v if v.startsWith("1.6") =>
       Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 8625882..ace67a6 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -161,7 +161,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
         logDebug(s"Stored partition data of $this into block manager with 
level $storageLevel")
         dataRead.rewind()
       }
-      blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
+      blockManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead))
+        .asInstanceOf[Iterator[T]]
     }
 
     if (partition.isBlockIdValid) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b5f1ab70/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 76f67ed..122ca06 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -34,12 +34,13 @@ import org.apache.spark.memory.StaticMemoryManager
 import org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
-import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
 import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.storage._
 import org.apache.spark.streaming.receiver._
 import org.apache.spark.streaming.util._
 import org.apache.spark.util.{ManualClock, Utils}
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 class ReceivedBlockHandlerSuite
   extends SparkFunSuite
@@ -155,7 +156,7 @@ class ReceivedBlockHandlerSuite
           val reader = new 
FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
           val bytes = reader.read(fileSegment)
           reader.close()
-          blockManager.dataDeserialize(generateBlockId(), bytes).toList
+          blockManager.dataDeserialize(generateBlockId(), new 
ChunkedByteBuffer(bytes)).toList
         }
         loggedData shouldEqual data
       }
@@ -264,7 +265,8 @@ class ReceivedBlockHandlerSuite
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, 
numCores = 1)
     val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 
1)
-    val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, 
serializer, conf,
+    val serializerManager = new SerializerManager(serializer, conf)
+    val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, 
serializerManager, conf,
       memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
     memManager.setMemoryStore(blockManager.memoryStore)
     blockManager.initialize("app-id")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to