[ https://issues.apache.org/jira/browse/SPARK-12222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Davies Liu resolved SPARK-12222. -------------------------------- Resolution: Fixed Fix Version/s: 1.6.0 2.0.0 Issue resolved by pull request 10213 [https://github.com/apache/spark/pull/10213] > deserialize RoaringBitmap using Kryo serializer throw Buffer underflow > exception > -------------------------------------------------------------------------------- > > Key: SPARK-12222 > URL: https://issues.apache.org/jira/browse/SPARK-12222 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.6.0 > Reporter: Fei Wang > Fix For: 2.0.0, 1.6.0 > > > here are some problems when deserialize RoaringBitmap. see the examples below: > run this piece of code > ``` > import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} > import java.io.DataInput > class KryoInputDataInputBridge(input: KryoInput) extends DataInput { > override def readLong(): Long = input.readLong() > override def readChar(): Char = input.readChar() > override def readFloat(): Float = input.readFloat() > override def readByte(): Byte = input.readByte() > override def readShort(): Short = input.readShort() > override def readUTF(): String = input.readString() // readString in > kryo does utf8 > override def readInt(): Int = input.readInt() > override def readUnsignedShort(): Int = input.readShortUnsigned() > override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt > override def readFully(b: Array[Byte]): Unit = input.read(b) > override def readFully(b: Array[Byte], off: Int, len: Int): Unit = > input.read(b, off, len) > override def readLine(): String = throw new > UnsupportedOperationException("readLine") > override def readBoolean(): Boolean = input.readBoolean() > override def readUnsignedByte(): Int = input.readByteUnsigned() > override def readDouble(): Double = input.readDouble() > } > class KryoOutputDataOutputBridge(output: KryoOutput) extends DataOutput { > override def writeFloat(v: Float): Unit = output.writeFloat(v) > // There is no "readChars" counterpart, except maybe "readLine", which > is not supported > override def writeChars(s: String): Unit = throw new > UnsupportedOperationException("writeChars") > override def writeDouble(v: Double): Unit = output.writeDouble(v) > override def writeUTF(s: String): Unit = output.writeString(s) // > writeString in kryo does UTF8 > override def writeShort(v: Int): Unit = output.writeShort(v) > override def writeInt(v: Int): Unit = output.writeInt(v) > override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v) > override def write(b: Int): Unit = output.write(b) > override def write(b: Array[Byte]): Unit = output.write(b) > override def write(b: Array[Byte], off: Int, len: Int): Unit = > output.write(b, off, len) > override def writeBytes(s: String): Unit = output.writeString(s) > override def writeChar(v: Int): Unit = output.writeChar(v.toChar) > override def writeLong(v: Long): Unit = output.writeLong(v) > override def writeByte(v: Int): Unit = output.writeByte(v) > } > val outStream = new FileOutputStream("D:\\wfserde") > val output = new KryoOutput(outStream) > val bitmap = new RoaringBitmap > bitmap.add(1) > bitmap.add(3) > bitmap.add(5) > bitmap.serialize(new KryoOutputDataOutputBridge(output)) > output.flush() > output.close() > val inStream = new FileInputStream("D:\\wfserde") > val input = new KryoInput(inStream) > val ret = new RoaringBitmap > ret.deserialize(new KryoInputDataInputBridge(input)) > println(ret) > ``` > this will throw `Buffer underflow` error: > ``` > com.esotericsoftware.kryo.KryoException: Buffer underflow. > at com.esotericsoftware.kryo.io.Input.require(Input.java:156) > at com.esotericsoftware.kryo.io.Input.skip(Input.java:131) > at com.esotericsoftware.kryo.io.Input.skip(Input.java:264) > at > org.apache.spark.sql.SQLQuerySuite$$anonfun$6$KryoInputDataInputBridge$1.skipBytes > ``` > after same investigation, i found this is caused by a bug of kryo's > `Input.skip(long count)`(https://github.com/EsotericSoftware/kryo/issues/119) > and we call this method in `KryoInputDataInputBridge`. > So i think we can fix this issue in this two ways: > 1) upgrade the kryo version to 2.23.0 or 2.24.0, which has fix this bug in > kryo (i am not sure the upgrade is safe in spark, can you check it? @davies ) > 2) we can bypass the kryo's `Input.skip(long count)` by directly call > another `skip` method in kryo's > Input.java(https://github.com/EsotericSoftware/kryo/blob/kryo-2.21/src/com/esotericsoftware/kryo/io/Input.java#L124), > i.e. write the bug-fixed version of `Input.skip(long count)` in > KryoInputDataInputBridge's `skipBytes` method: > ``` > class KryoInputDataInputBridge(input: KryoInput) extends DataInput { > ... > override def skipBytes(n: Int): Int = { > var remaining: Long = n > while (remaining > 0) { > val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int] > input.skip(skip) > remaining -= skip > } > n > } > ... > } > ``` -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org