Updated Branches:
  refs/heads/branch-0.8 5ce6c7567 -> 30786c650

Merge pull request #157 from rxin/kryo

3 Kryo related changes.

1. Call Kryo setReferences before calling user specified Kryo registrator. This 
is done so the user specified registrator can override the default setting.

2. Register more internal classes (MapStatus, BlockManagerId).

3. Slightly refactored the internal class registration to allocate less memory.

(cherry picked from commit 58d4f6c8a5d9739dc2a3f26f116528457336f0d3)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/30786c65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/30786c65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/30786c65

Branch: refs/heads/branch-0.8
Commit: 30786c650f5eb9d0f215b0ec9933f967d9bb264c
Parents: 5ce6c75
Author: Matei Zaharia <[email protected]>
Authored: Sun Nov 10 09:23:56 2013 -0800
Committer: Reynold Xin <[email protected]>
Committed: Sun Nov 10 11:58:58 2013 -0800

----------------------------------------------------------------------
 .../spark/serializer/KryoSerializer.scala       | 52 +++++++++++---------
 1 file changed, 30 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/30786c65/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 55b25f1..e748c22 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -27,13 +27,17 @@ import com.twitter.chill.{EmptyScalaKryoInstantiator, 
AllScalaRegistrar}
 
 import org.apache.spark.{SerializableWritable, Logging}
 import org.apache.spark.broadcast.HttpBroadcast
-import org.apache.spark.storage.{GetBlock,GotBlock, PutBlock, StorageLevel, 
TestBlockId}
+import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.storage._
 
 /**
- * A Spark serializer that uses the 
[[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
+ * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo 
serialization library]].
  */
 class KryoSerializer extends org.apache.spark.serializer.Serializer with 
Logging {
-  private val bufferSize = 
System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
+
+  private val bufferSize = {
+    System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 
1024
+  }
 
   def newKryoOutput() = new KryoOutput(bufferSize)
 
@@ -42,21 +46,11 @@ class KryoSerializer extends 
org.apache.spark.serializer.Serializer with Logging
     val kryo = instantiator.newKryo()
     val classLoader = Thread.currentThread.getContextClassLoader
 
-    val blockId = TestBlockId("1")
-    // Register some commonly used classes
-    val toRegister: Seq[AnyRef] = Seq(
-      ByteBuffer.allocate(1),
-      StorageLevel.MEMORY_ONLY,
-      PutBlock(blockId, ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
-      GotBlock(blockId, ByteBuffer.allocate(1)),
-      GetBlock(blockId),
-      1 to 10,
-      1 until 10,
-      1L to 10L,
-      1L until 10L
-    )
-
-    for (obj <- toRegister) kryo.register(obj.getClass)
+    // Allow disabling Kryo reference tracking if user knows their object 
graphs don't have loops.
+    // Do this before we invoke the user registrator so the user registrator 
can override this.
+    kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", 
"true").toBoolean)
+
+    for (cls <- KryoSerializer.toRegister) kryo.register(cls)
 
     // Allow sending SerializableWritable
     kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
@@ -78,10 +72,6 @@ class KryoSerializer extends 
org.apache.spark.serializer.Serializer with Logging
     new AllScalaRegistrar().apply(kryo)
 
     kryo.setClassLoader(classLoader)
-
-    // Allow disabling Kryo reference tracking if user knows their object 
graphs don't have loops
-    kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", 
"true").toBoolean)
-
     kryo
   }
 
@@ -165,3 +155,21 @@ private[spark] class KryoSerializerInstance(ks: 
KryoSerializer) extends Serializ
 trait KryoRegistrator {
   def registerClasses(kryo: Kryo)
 }
+
+private[serializer] object KryoSerializer {
+  // Commonly used classes.
+  private val toRegister: Seq[Class[_]] = Seq(
+    ByteBuffer.allocate(1).getClass,
+    classOf[StorageLevel],
+    classOf[PutBlock],
+    classOf[GotBlock],
+    classOf[GetBlock],
+    classOf[MapStatus],
+    classOf[BlockManagerId],
+    classOf[Array[Byte]],
+    (1 to 10).getClass,
+    (1 until 10).getClass,
+    (1L to 10L).getClass,
+    (1L until 10L).getClass
+  )
+}

Reply via email to