*Resending as I do not see that this made it to the mailing list, sorry if in fact it did an is just nor reflected online yet.*
I’m very perplexed with the following. I have a set of AVRO generated objects that are sent to a SparkStreaming job via Kafka. The SparkStreaming job follows the receiver-based approach. I am encountering the below error when I attempt to de serialize the payload: 15/04/30 17:49:25 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 9 to sparkExecutor@192.168.1.3:6105115/04/30 17:49:25 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 9 is 140 bytes15/04/30 17:49:25 ERROR TaskResultGetter: Exception while getting task resultcom.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: relations (com.opsdatastore.model.ObjectDetails) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) at org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) at org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) ... 17 more15/04/30 17:49:25 INFO TaskSchedulerImpl: Removed TaskSet 20.0, whose tasks have all completed, from pool Basic code looks like this. Register the class with Kryo as follows: val sc = new SparkConf(true) .set("spark.streaming.unpersist", "true") .setAppName("StreamingKafkaConsumer") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // register all related AVRO generated classes sc.registerKryoClasses(Array( classOf[ConfigurationProperty], classOf[Event], classOf[Identifier], classOf[Metric], classOf[ObjectDetails], classOf[Relation], classOf[RelationProperty] )) Use the receiver based approach to consume messages from Kafka: val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics, storageLevel) Now process the received messages: val raw = messages.map(_._2) val dStream = raw.map( byte => { // Avro Decoder println("Byte length: " + byte.length) val decoder = new AvroDecoder[ObjectDetails](schema = ObjectDetails.getClassSchema) val message = decoder.fromBytes(byte) println(s"AvroMessage : Type : ${message.getType}, Payload : $message") message } ) When i look in the logs of the workers, in standard out i can se the messages being printed, in fact I’m even able to access the Type field with out issue: Byte length: 315 AvroMessage : Type : Storage, Payload : {"name": "Storage 1", "type": "Storage", "vendor": "6274g51cbkmkqisk", "model": "lk95hqk9m10btaot", "timestamp": 1430428565141, "identifiers": {"ID": {"name": "ID", "value": "Storage-1"}}, "configuration": null, "metrics": {"Disk Space Usage (GB)": {"name": "Disk Space Usage (GB)", "source": "Generated", "values": {"1430428565356": {"timestamp": 1430428565356, "value": 42.55948347907833}}}, "Disk Space Capacity (GB)": {"name": "Disk Space Capacity (GB)", "source": "Generated", "values": {"1430428565356": {"timestamp": 1430428565356, "value": 38.980024705429095}}}}, "relations": [{"type": "parent", "object_type": "Virtual Machine", "properties": {"ID": {"name": "ID", "value": "Virtual Machine-1"}}}], "events": [], "components": []} The ObjectDetails which is generated from AVRO, has a relations field which is of type java.util.List: /** * Gets the value of the 'relations' field. */ public java.util.List<com.opsdatastore.model.Relation> getRelations() { return relations; } So I’m at a loss at the moment as to what is causing the above NPE and exactly how to address this. I’m even more confused as I appear to be able to access the deserialized message within the raw.map(...) as shown above. Is there something special I need to do for the List? Am I missing something obvious here? If so an example would be appreciated. TIA for the assistance as I am stumped at the moment. -Todd