Repository: spark
Updated Branches:
  refs/heads/master ed8d1531f -> bf25f9bdf


[SPARK-11016] Move RoaringBitmap to explicit Kryo serializer

Fix the serialization of RoaringBitmap with Kyro serializer

This PR came from https://github.com/metamx/spark/pull/1, thanks to drcrallen

Author: Davies Liu <[email protected]>
Author: Charles Allen <[email protected]>

Closes #9748 from davies/SPARK-11016.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf25f9bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf25f9bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf25f9bd

Branch: refs/heads/master
Commit: bf25f9bdfc7bd8533890c7df1b35afa912dc6d3d
Parents: ed8d153
Author: Davies Liu <[email protected]>
Authored: Tue Nov 17 19:39:39 2015 -0800
Committer: Davies Liu <[email protected]>
Committed: Tue Nov 17 19:39:39 2015 -0800

----------------------------------------------------------------------
 .../spark/serializer/KryoSerializer.scala       | 64 +++++++++++++++++---
 1 file changed, 55 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bf25f9bd/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index c5195c1..1bcb317 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.serializer
 
-import java.io.{EOFException, IOException, InputStream, OutputStream}
+import java.io.{EOFException, IOException, InputStream, OutputStream, 
DataInput, DataOutput}
 import java.nio.ByteBuffer
 import javax.annotation.Nullable
 
@@ -25,12 +25,12 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
-import com.esotericsoftware.kryo.{Kryo, KryoException}
+import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => 
KryoClassSerializer}
 import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
 import com.esotericsoftware.kryo.serializers.{JavaSerializer => 
KryoJavaSerializer}
 import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
 import org.apache.avro.generic.{GenericData, GenericRecord}
-import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, 
RoaringBitmap}
+import org.roaringbitmap.RoaringBitmap
 
 import org.apache.spark._
 import org.apache.spark.api.python.PythonBroadcast
@@ -94,6 +94,9 @@ class KryoSerializer(conf: SparkConf)
     for (cls <- KryoSerializer.toRegister) {
       kryo.register(cls)
     }
+    for ((cls, ser) <- KryoSerializer.toRegisterSerializer) {
+      kryo.register(cls, ser)
+    }
 
     // For results returned by asJavaIterable. See 
JavaIterableWrapperSerializer.
     kryo.register(JavaIterableWrapperSerializer.wrapperClass, new 
JavaIterableWrapperSerializer)
@@ -363,12 +366,6 @@ private[serializer] object KryoSerializer {
     classOf[StorageLevel],
     classOf[CompressedMapStatus],
     classOf[HighlyCompressedMapStatus],
-    classOf[RoaringBitmap],
-    classOf[RoaringArray],
-    classOf[RoaringArray.Element],
-    classOf[Array[RoaringArray.Element]],
-    classOf[ArrayContainer],
-    classOf[BitmapContainer],
     classOf[CompactBuffer[_]],
     classOf[BlockManagerId],
     classOf[Array[Byte]],
@@ -377,6 +374,55 @@ private[serializer] object KryoSerializer {
     classOf[BoundedPriorityQueue[_]],
     classOf[SparkConf]
   )
+
+  private val toRegisterSerializer = Map[Class[_], KryoClassSerializer[_]](
+    classOf[RoaringBitmap] -> new KryoClassSerializer[RoaringBitmap]() {
+      override def write(kryo: Kryo, output: KryoOutput, bitmap: 
RoaringBitmap): Unit = {
+        bitmap.serialize(new KryoOutputDataOutputBridge(output))
+      }
+      override def read(kryo: Kryo, input: KryoInput, cls: 
Class[RoaringBitmap]): RoaringBitmap = {
+        val ret = new RoaringBitmap
+        ret.deserialize(new KryoInputDataInputBridge(input))
+        ret
+      }
+    }
+  )
+}
+
+private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends 
DataInput {
+  override def readLong(): Long = input.readLong()
+  override def readChar(): Char = input.readChar()
+  override def readFloat(): Float = input.readFloat()
+  override def readByte(): Byte = input.readByte()
+  override def readShort(): Short = input.readShort()
+  override def readUTF(): String = input.readString() // readString in kryo 
does utf8
+  override def readInt(): Int = input.readInt()
+  override def readUnsignedShort(): Int = input.readShortUnsigned()
+  override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
+  override def readFully(b: Array[Byte]): Unit = input.read(b)
+  override def readFully(b: Array[Byte], off: Int, len: Int): Unit = 
input.read(b, off, len)
+  override def readLine(): String = throw new 
UnsupportedOperationException("readLine")
+  override def readBoolean(): Boolean = input.readBoolean()
+  override def readUnsignedByte(): Int = input.readByteUnsigned()
+  override def readDouble(): Double = input.readDouble()
+}
+
+private[serializer] class KryoOutputDataOutputBridge(output: KryoOutput) 
extends DataOutput {
+  override def writeFloat(v: Float): Unit = output.writeFloat(v)
+  // There is no "readChars" counterpart, except maybe "readLine", which is 
not supported
+  override def writeChars(s: String): Unit = throw new 
UnsupportedOperationException("writeChars")
+  override def writeDouble(v: Double): Unit = output.writeDouble(v)
+  override def writeUTF(s: String): Unit = output.writeString(s) // 
writeString in kryo does UTF8
+  override def writeShort(v: Int): Unit = output.writeShort(v)
+  override def writeInt(v: Int): Unit = output.writeInt(v)
+  override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v)
+  override def write(b: Int): Unit = output.write(b)
+  override def write(b: Array[Byte]): Unit = output.write(b)
+  override def write(b: Array[Byte], off: Int, len: Int): Unit = 
output.write(b, off, len)
+  override def writeBytes(s: String): Unit = output.writeString(s)
+  override def writeChar(v: Int): Unit = output.writeChar(v.toChar)
+  override def writeLong(v: Long): Unit = output.writeLong(v)
+  override def writeByte(v: Int): Unit = output.writeByte(v)
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to