Hi, all! I have a code, serializing RDD as Kryo, and saving it as sequence file. It works fine in 1.5.1, but, while switching to 2.1.1 it does not work.
I am trying to serialize RDD of Tuple2<> (got from PairRDD). 1. RDD consists of different heterogeneous objects (aggregates, like HLL, QTree, Min, Max, etc.) 2. Save is performed within streaming 3. Read is performed out of streaming (another app) 4. Supposed, that such error can be due to custom serializers - turned them off, but errors still exists 5. Tried disabling references in Kryo (since I saw an error while resolving references) - got StackOverflow, and significant performance degradation 6. Implementing Serializable/Externalizable is not a solution, unfortunately. Expected behavior: saveAsObjectFile/loadObjectFile are symmetric, and it's possible to load previously saved RDD. Code of save/load: object KryoFile { val THREAD_LOCAL_CACHE = new ThreadLocal[Kryo] /* * Used to write as Object file using kryo serialization */ def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) { val kryoSerializer = new KryoSerializer(rdd.context.getConf) rdd.context.setJobDescription("Saving to path " + path) rdd.mapPartitions(iter => iter.grouped(10) .map(_.toArray)) .map(splitArray => { //initializes kyro and calls your registrator class var kryo = THREAD_LOCAL_CACHE.get() if (null == kryo) { kryo = kryoSerializer.newKryo() THREAD_LOCAL_CACHE.set(kryo) } //convert data to bytes val bao = new ByteArrayOutputStream() val output = kryoSerializer.newKryoOutput() output.setOutputStream(bao) kryo.writeClassAndObject(output, splitArray) output.close() kryo.reset() // We are ignoring key field of sequence file val byteWritable = new BytesWritable(bao.toByteArray) (NullWritable.get(), byteWritable) }).saveAsSequenceFile(path) } /* * Method to read from object file which is saved kryo format. */ def loadObjectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)(implicit ct: ClassTag[T]) = { val kryoSerializer = new KryoSerializer(sc.getConf) sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) .flatMap(x => { var kryo = THREAD_LOCAL_CACHE.get() if (null == kryo) { kryo = kryoSerializer.newKryo() THREAD_LOCAL_CACHE.set(kryo) } val input = new Input() input.setBuffer(x._2.getBytes) val data = kryo.readClassAndObject(input) kryo.reset() val dataObject = data.asInstanceOf[Array[T]] dataObject }) } } When trying to deserialize, I got such errors: 17/06/21 08:19:18 ERROR Executor: Exception in task 14.0 in stage 0.0 (TID 14) java.lang.ArrayIndexOutOfBoundsException: -2 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject( MapReferenceResolver.java:60) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:706) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ ObjectArraySerializer.read(DefaultArraySerializers.java:396) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ ObjectArraySerializer.read(DefaultArraySerializers.java:307) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790) at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$ 1.apply(KryoFile.scala:75) at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$ 1.apply(KryoFile.scala:62) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply( SparkContext.scala:1951) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply( SparkContext.scala:1951) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/06/21 08:19:18 ERROR Executor: Exception in task 12.0 in stage 0.0 (TID 12) java.lang.ArrayStoreException: java.util.Collections$EmptyMap at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ ObjectArraySerializer.read(DefaultArraySerializers.java:396) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ ObjectArraySerializer.read(DefaultArraySerializers.java:307) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790) at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$ 1.apply(KryoFile.scala:75) at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$ 1.apply(KryoFile.scala:62) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply( SparkContext.scala:1951) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply( SparkContext.scala:1951) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- С уважением, Крашенинников Александр.