Fix Chill serialization of Range objects, which used to write out each element, and register user and Spark classes before Chill's serializers to let them override Chill's behavior in general.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c84c2052 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c84c2052 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c84c2052 Branch: refs/heads/scala-2.10 Commit: c84c2052898cb055012b8a6da00b8990cd8302c4 Parents: 7b3ae04 Author: Matei Zaharia <ma...@eecs.berkeley.edu> Authored: Wed Oct 9 14:38:38 2013 -0700 Committer: Matei Zaharia <ma...@eecs.berkeley.edu> Committed: Wed Oct 9 16:23:40 2013 -0700 ---------------------------------------------------------------------- .../spark/serializer/KryoSerializer.scala | 14 ++++++++++--- .../spark/serializer/KryoSerializerSuite.scala | 21 ++++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c84c2052/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 6c500ba..e936b1c 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -23,7 +23,7 @@ import java.io.{EOFException, InputStream, OutputStream} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.esotericsoftware.kryo.{KryoException, Kryo} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} -import com.twitter.chill.ScalaKryoInstantiator +import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar} import org.apache.spark.{SerializableWritable, Logging} import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock, StorageLevel} @@ -39,7 +39,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging def newKryoOutput() = new KryoOutput(bufferSize) def newKryo(): Kryo = { - val instantiator = new ScalaKryoInstantiator + val instantiator = new EmptyScalaKryoInstantiator val kryo = instantiator.newKryo() val classLoader = Thread.currentThread.getContextClassLoader @@ -49,7 +49,11 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging StorageLevel.MEMORY_ONLY, PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY), GotBlock("1", ByteBuffer.allocate(1)), - GetBlock("1") + GetBlock("1"), + 1 to 10, + 1 until 10, + 1L to 10L, + 1L until 10L ) for (obj <- toRegister) kryo.register(obj.getClass) @@ -69,6 +73,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging case _: Exception => println("Failed to register spark.kryo.registrator") } + // Register Chill's classes; we do this after our ranges and the user's own classes to let + // our code override the generic serialziers in Chill for things like Seq + new AllScalaRegistrar().apply(kryo) + kryo.setClassLoader(classLoader) // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c84c2052/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 0164dda..c016c51 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -103,6 +103,27 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three"))) } + test("ranges") { + val ser = (new KryoSerializer).newInstance() + def check[T](t: T) { + assert(ser.deserialize[T](ser.serialize(t)) === t) + // Check that very long ranges don't get written one element at a time + assert(ser.serialize(t).limit < 100) + } + check(1 to 1000000) + check(1 to 1000000 by 2) + check(1 until 1000000) + check(1 until 1000000 by 2) + check(1L to 1000000L) + check(1L to 1000000L by 2L) + check(1L until 1000000L) + check(1L until 1000000L by 2L) + check(1.0 to 1000000.0 by 1.0) + check(1.0 to 1000000.0 by 2.0) + check(1.0 until 1000000.0 by 1.0) + check(1.0 until 1000000.0 by 2.0) + } + test("custom registrator") { System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)