Moved the Spark internal class registration for Kryo into an object, and added more classes (e.g. MapStatus, BlockManagerId) to the registration.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c845611f Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c845611f Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c845611f Branch: refs/heads/master Commit: c845611fc387207a51a063d0809770f23fb7b8cc Parents: 7c5f70d Author: Reynold Xin <[email protected]> Authored: Sat Nov 9 23:00:08 2013 -0800 Committer: Reynold Xin <[email protected]> Committed: Sat Nov 9 23:00:08 2013 -0800 ---------------------------------------------------------------------- .../spark/serializer/KryoSerializer.scala | 40 +++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c845611f/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 4f60f0b..e748c22 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -27,11 +27,11 @@ import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar} import org.apache.spark.{SerializableWritable, Logging} import org.apache.spark.broadcast.HttpBroadcast -import org.apache.spark.storage.{GetBlock,GotBlock, PutBlock, StorageLevel, TestBlockId} +import org.apache.spark.scheduler.MapStatus +import org.apache.spark.storage._ /** - * A Spark serializer that uses the - * [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]]. + * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. */ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging { @@ -50,21 +50,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging // Do this before we invoke the user registrator so the user registrator can override this. kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean) - val blockId = TestBlockId("1") - // Register some commonly used classes - val toRegister: Seq[AnyRef] = Seq( - ByteBuffer.allocate(1), - StorageLevel.MEMORY_ONLY, - PutBlock(blockId, ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY), - GotBlock(blockId, ByteBuffer.allocate(1)), - GetBlock(blockId), - 1 to 10, - 1 until 10, - 1L to 10L, - 1L until 10L - ) - - for (obj <- toRegister) kryo.register(obj.getClass) + for (cls <- KryoSerializer.toRegister) kryo.register(cls) // Allow sending SerializableWritable kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) @@ -169,3 +155,21 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ trait KryoRegistrator { def registerClasses(kryo: Kryo) } + +private[serializer] object KryoSerializer { + // Commonly used classes. + private val toRegister: Seq[Class[_]] = Seq( + ByteBuffer.allocate(1).getClass, + classOf[StorageLevel], + classOf[PutBlock], + classOf[GotBlock], + classOf[GetBlock], + classOf[MapStatus], + classOf[BlockManagerId], + classOf[Array[Byte]], + (1 to 10).getClass, + (1 until 10).getClass, + (1L to 10L).getClass, + (1L until 10L).getClass + ) +}
