Fix Chill serialization of Range objects, which used to write out each
element, and register user and Spark classes before Chill's serializers
to let them override Chill's behavior in general.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c84c2052
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c84c2052
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c84c2052

Branch: refs/heads/scala-2.10
Commit: c84c2052898cb055012b8a6da00b8990cd8302c4
Parents: 7b3ae04
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Wed Oct 9 14:38:38 2013 -0700
Committer: Matei Zaharia <ma...@eecs.berkeley.edu>
Committed: Wed Oct 9 16:23:40 2013 -0700

----------------------------------------------------------------------
 .../spark/serializer/KryoSerializer.scala       | 14 ++++++++++---
 .../spark/serializer/KryoSerializerSuite.scala  | 21 ++++++++++++++++++++
 2 files changed, 32 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c84c2052/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 6c500ba..e936b1c 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -23,7 +23,7 @@ import java.io.{EOFException, InputStream, OutputStream}
 import com.esotericsoftware.kryo.serializers.{JavaSerializer => 
KryoJavaSerializer}
 import com.esotericsoftware.kryo.{KryoException, Kryo}
 import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
-import com.twitter.chill.ScalaKryoInstantiator
+import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}
 
 import org.apache.spark.{SerializableWritable, Logging}
 import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock, StorageLevel}
@@ -39,7 +39,7 @@ class KryoSerializer extends 
org.apache.spark.serializer.Serializer with Logging
   def newKryoOutput() = new KryoOutput(bufferSize)
 
   def newKryo(): Kryo = {
-    val instantiator = new ScalaKryoInstantiator
+    val instantiator = new EmptyScalaKryoInstantiator
     val kryo = instantiator.newKryo()
     val classLoader = Thread.currentThread.getContextClassLoader
 
@@ -49,7 +49,11 @@ class KryoSerializer extends 
org.apache.spark.serializer.Serializer with Logging
       StorageLevel.MEMORY_ONLY,
       PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
       GotBlock("1", ByteBuffer.allocate(1)),
-      GetBlock("1")
+      GetBlock("1"),
+      1 to 10,
+      1 until 10,
+      1L to 10L,
+      1L until 10L
     )
 
     for (obj <- toRegister) kryo.register(obj.getClass)
@@ -69,6 +73,10 @@ class KryoSerializer extends 
org.apache.spark.serializer.Serializer with Logging
       case _: Exception => println("Failed to register spark.kryo.registrator")
     }
 
+    // Register Chill's classes; we do this after our ranges and the user's 
own classes to let
+    // our code override the generic serialziers in Chill for things like Seq
+    new AllScalaRegistrar().apply(kryo)
+
     kryo.setClassLoader(classLoader)
 
     // Allow disabling Kryo reference tracking if user knows their object 
graphs don't have loops

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c84c2052/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 0164dda..c016c51 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -103,6 +103,27 @@ class KryoSerializerSuite extends FunSuite with 
SharedSparkContext {
     check(List(mutable.HashMap("one" -> 1, "two" -> 
2),mutable.HashMap(1->"one",2->"two",3->"three")))
   }
 
+  test("ranges") {
+    val ser = (new KryoSerializer).newInstance()
+    def check[T](t: T) {
+      assert(ser.deserialize[T](ser.serialize(t)) === t)
+      // Check that very long ranges don't get written one element at a time
+      assert(ser.serialize(t).limit < 100)
+    }
+    check(1 to 1000000)
+    check(1 to 1000000 by 2)
+    check(1 until 1000000)
+    check(1 until 1000000 by 2)
+    check(1L to 1000000L)
+    check(1L to 1000000L by 2L)
+    check(1L until 1000000L)
+    check(1L until 1000000L by 2L)
+    check(1.0 to 1000000.0 by 1.0)
+    check(1.0 to 1000000.0 by 2.0)
+    check(1.0 until 1000000.0 by 1.0)
+    check(1.0 until 1000000.0 by 2.0)
+  }
+
   test("custom registrator") {
     System.setProperty("spark.kryo.registrator", 
classOf[MyRegistrator].getName)
 

Reply via email to