Hi, all!
I have a code, serializing RDD as Kryo, and saving it as sequence file. It
works fine in 1.5.1, but, while switching to 2.1.1 it does not work.

I am trying to serialize RDD of Tuple2<> (got from PairRDD).

   1. RDD consists of different heterogeneous objects (aggregates, like
   HLL, QTree, Min, Max, etc.)
   2. Save is performed within streaming
   3. Read is performed out of streaming (another app)
   4. Supposed, that such error can be due to custom serializers - turned
   them off, but errors still exists
   5. Tried disabling references in Kryo (since I saw an error while
   resolving references) - got StackOverflow, and significant performance
   degradation
   6. Implementing Serializable/Externalizable is not a solution,
   unfortunately.

Expected behavior:

saveAsObjectFile/loadObjectFile are symmetric, and it's possible to load
previously saved RDD.

Code of save/load:

object KryoFile {

  val THREAD_LOCAL_CACHE = new ThreadLocal[Kryo]

  /*
   * Used to write as Object file using kryo serialization
   */
  def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
    val kryoSerializer = new KryoSerializer(rdd.context.getConf)

    rdd.context.setJobDescription("Saving to path " + path)
    rdd.mapPartitions(iter => iter.grouped(10)
      .map(_.toArray))
      .map(splitArray => {
        //initializes kyro and calls your registrator class
        var kryo = THREAD_LOCAL_CACHE.get()
        if (null == kryo) {
          kryo = kryoSerializer.newKryo()
          THREAD_LOCAL_CACHE.set(kryo)
        }

        //convert data to bytes
        val bao = new ByteArrayOutputStream()
        val output = kryoSerializer.newKryoOutput()
        output.setOutputStream(bao)
        kryo.writeClassAndObject(output, splitArray)
        output.close()
        kryo.reset()

        // We are ignoring key field of sequence file
        val byteWritable = new BytesWritable(bao.toByteArray)
        (NullWritable.get(), byteWritable)
      }).saveAsSequenceFile(path)
  }

  /*
   * Method to read from object file which is saved kryo format.
   */
  def loadObjectFile[T](sc: SparkContext, path: String, minPartitions:
Int = 1)(implicit ct: ClassTag[T]) = {
    val kryoSerializer = new KryoSerializer(sc.getConf)

    sc.sequenceFile(path, classOf[NullWritable],
classOf[BytesWritable], minPartitions)
      .flatMap(x => {

        var kryo = THREAD_LOCAL_CACHE.get()
        if (null == kryo) {
          kryo = kryoSerializer.newKryo()
          THREAD_LOCAL_CACHE.set(kryo)
        }

        val input = new Input()
        input.setBuffer(x._2.getBytes)
        val data = kryo.readClassAndObject(input)
        kryo.reset()
        val dataObject = data.asInstanceOf[Array[T]]
        dataObject
      })

  }
}


When trying to deserialize, I got such errors:
17/06/21 08:19:18 ERROR Executor: Exception in task 14.0 in stage 0.0 (TID
14)
java.lang.ArrayIndexOutOfBoundsException: -2
    at java.util.ArrayList.elementData(ArrayList.java:418)
    at java.util.ArrayList.get(ArrayList.java:431)
    at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:706)
    at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:396)
    at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:307)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
    at
com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$1.apply(KryoFile.scala:75)
    at
com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$1.apply(KryoFile.scala:62)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
    at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
    at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)


17/06/21 08:19:18 ERROR Executor: Exception in task 12.0 in stage 0.0 (TID
12)
java.lang.ArrayStoreException: java.util.Collections$EmptyMap
    at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:396)
    at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:307)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
    at
com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$1.apply(KryoFile.scala:75)
    at
com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$1.apply(KryoFile.scala:62)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
    at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
    at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

--
*Alexander Krasheninnikov*
Head of Data Team

Reply via email to