Re: Spark Streaming Kafka Avro NPE on deserialization of payload
There was a similar discussion over here http://mail-archives.us.apache.org/mod_mbox/spark-user/201411.mbox/%3ccakz4c0s_cuo90q2jxudvx9wc4fwu033kx3-fjujytxxhr7p...@mail.gmail.com%3E Thanks Best Regards On Fri, May 1, 2015 at 7:12 PM, Todd Nist tsind...@gmail.com wrote: *Resending as I do not see that this made it to the mailing list, sorry if in fact it did an is just nor reflected online yet.* I’m very perplexed with the following. I have a set of AVRO generated objects that are sent to a SparkStreaming job via Kafka. The SparkStreaming job follows the receiver-based approach. I am encountering the below error when I attempt to de serialize the payload: 15/04/30 17:49:25 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 9 to sparkExecutor@192.168.1.3:6105115/04/30 17:49:25 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 9 is 140 bytes15/04/30 17:49:25 ERROR TaskResultGetter: Exception while getting task resultcom.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: relations (com.opsdatastore.model.ObjectDetails) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) at org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) at org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) ... 17 more15/04/30 17:49:25 INFO TaskSchedulerImpl: Removed TaskSet 20.0, whose tasks have all completed, from pool Basic code looks like this. Register the class with Kryo as follows: val sc = new SparkConf(true) .set(spark.streaming.unpersist, true) .setAppName(StreamingKafkaConsumer) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) // register all related AVRO generated classes sc.registerKryoClasses(Array( classOf[ConfigurationProperty], classOf[Event], classOf[Identifier], classOf[Metric], classOf[ObjectDetails], classOf[Relation], classOf[RelationProperty] )) Use the receiver based approach to consume messages from Kafka: val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics, storageLevel) Now process the received messages: val raw = messages.map(_._2) val dStream = raw.map( byte = { // Avro Decoder println(Byte length: + byte.length) val decoder = new AvroDecoder[ObjectDetails](schema = ObjectDetails.getClassSchema) val message = decoder.fromBytes(byte) println(sAvroMessage : Type : ${message.getType}, Payload : $message) message } ) When i look in the logs of the workers, in standard out i can se the messages being printed, in fact I’m even able to access the Type field with out issue: Byte length: 315 AvroMessage : Type : Storage, Payload : {name: Storage 1, type: Storage, vendor: 6274g51cbkmkqisk, model: lk95hqk9m10btaot, timestamp: 1430428565141, identifiers: {ID: {name: ID, value: Storage-1}}, configuration: null, metrics: {Disk Space Usage (GB): {name: Disk Space Usage
Spark Streaming Kafka Avro NPE on deserialization of payload
*Resending as I do not see that this made it to the mailing list, sorry if in fact it did an is just nor reflected online yet.* I’m very perplexed with the following. I have a set of AVRO generated objects that are sent to a SparkStreaming job via Kafka. The SparkStreaming job follows the receiver-based approach. I am encountering the below error when I attempt to de serialize the payload: 15/04/30 17:49:25 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 9 to sparkExecutor@192.168.1.3:6105115/04/30 17:49:25 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 9 is 140 bytes15/04/30 17:49:25 ERROR TaskResultGetter: Exception while getting task resultcom.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: relations (com.opsdatastore.model.ObjectDetails) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) at org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) at org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) ... 17 more15/04/30 17:49:25 INFO TaskSchedulerImpl: Removed TaskSet 20.0, whose tasks have all completed, from pool Basic code looks like this. Register the class with Kryo as follows: val sc = new SparkConf(true) .set(spark.streaming.unpersist, true) .setAppName(StreamingKafkaConsumer) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) // register all related AVRO generated classes sc.registerKryoClasses(Array( classOf[ConfigurationProperty], classOf[Event], classOf[Identifier], classOf[Metric], classOf[ObjectDetails], classOf[Relation], classOf[RelationProperty] )) Use the receiver based approach to consume messages from Kafka: val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics, storageLevel) Now process the received messages: val raw = messages.map(_._2) val dStream = raw.map( byte = { // Avro Decoder println(Byte length: + byte.length) val decoder = new AvroDecoder[ObjectDetails](schema = ObjectDetails.getClassSchema) val message = decoder.fromBytes(byte) println(sAvroMessage : Type : ${message.getType}, Payload : $message) message } ) When i look in the logs of the workers, in standard out i can se the messages being printed, in fact I’m even able to access the Type field with out issue: Byte length: 315 AvroMessage : Type : Storage, Payload : {name: Storage 1, type: Storage, vendor: 6274g51cbkmkqisk, model: lk95hqk9m10btaot, timestamp: 1430428565141, identifiers: {ID: {name: ID, value: Storage-1}}, configuration: null, metrics: {Disk Space Usage (GB): {name: Disk Space Usage (GB), source: Generated, values: {1430428565356: {timestamp: 1430428565356, value: 42.55948347907833}}}, Disk Space Capacity (GB): {name: Disk Space Capacity (GB), source: Generated, values: {1430428565356: {timestamp: 1430428565356, value: 38.980024705429095, relations: [{type: parent, object_type: Virtual Machine, properties: {ID: {name: ID, value: Virtual Machine-1}}}],
Spark Streaming Kafka Avro NPE on deserialization of payload
I’m very perplexed with the following. I have a set of AVRO generated objects that are sent to a SparkStreaming job via Kafka. The SparkStreaming job follows the receiver-based approach. I am encountering the below error when I attempt to de serialize the payload: 15/04/30 17:49:25 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 9 to sparkExecutor@192.168.1.3:6105115/04/30 17:49:25 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 9 is 140 bytes15/04/30 17:49:25 ERROR TaskResultGetter: Exception while getting task resultcom.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: relations (com.opsdatastore.model.ObjectDetails) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) at org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) at org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) ... 17 more15/04/30 17:49:25 INFO TaskSchedulerImpl: Removed TaskSet 20.0, whose tasks have all completed, from pool Basic code looks like this. Register the class with Kryo as follows: val sc = new SparkConf(true) .set(spark.streaming.unpersist, true) .setAppName(StreamingKafkaConsumer) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) // register all related AVRO generated classes sc.registerKryoClasses(Array( classOf[ConfigurationProperty], classOf[Event], classOf[Identifier], classOf[Metric], classOf[ObjectDetails], classOf[Relation], classOf[RelationProperty] )) Use the receiver based approach to consume messages from Kafka: val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics, storageLevel) Now process the received messages: val raw = messages.map(_._2) val dStream = raw.map( byte = { // Avro Decoder println(Byte length: + byte.length) val decoder = new AvroDecoder[ObjectDetails](schema = ObjectDetails.getClassSchema) val message = decoder.fromBytes(byte) println(sAvroMessage : Type : ${message.getType}, Payload : $message) message } ) When i look in the logs of the workers, in standard out i can se the messages being printed, in fact I’m even able to access the Type field with out issue: Byte length: 315 AvroMessage : Type : Storage, Payload : {name: Storage 1, type: Storage, vendor: 6274g51cbkmkqisk, model: lk95hqk9m10btaot, timestamp: 1430428565141, identifiers: {ID: {name: ID, value: Storage-1}}, configuration: null, metrics: {Disk Space Usage (GB): {name: Disk Space Usage (GB), source: Generated, values: {1430428565356: {timestamp: 1430428565356, value: 42.55948347907833}}}, Disk Space Capacity (GB): {name: Disk Space Capacity (GB), source: Generated, values: {1430428565356: {timestamp: 1430428565356, value: 38.980024705429095, relations: [{type: parent, object_type: Virtual Machine, properties: {ID: {name: ID, value: Virtual Machine-1}}}], events: [], components: []} The ObjectDetails which is generated from AVRO, has a relations field which is of type java.util.List: