Re: Spark Streaming Kafka Avro NPE on deserialization of payload

2015-05-02 Thread Akhil Das
There was a similar discussion over here
http://mail-archives.us.apache.org/mod_mbox/spark-user/201411.mbox/%3ccakz4c0s_cuo90q2jxudvx9wc4fwu033kx3-fjujytxxhr7p...@mail.gmail.com%3E

Thanks
Best Regards

On Fri, May 1, 2015 at 7:12 PM, Todd Nist tsind...@gmail.com wrote:

 *Resending as I do not see that this made it to the mailing list, sorry if
 in fact it did an is just nor reflected online yet.*

 I’m very perplexed with the following. I have a set of AVRO generated
 objects that are sent to a SparkStreaming job via Kafka. The SparkStreaming
 job follows the receiver-based approach. I am encountering the below
 error when I attempt to de serialize the payload:

 15/04/30 17:49:25 INFO MapOutputTrackerMasterActor: Asked to send map output 
 locations for shuffle 9 to sparkExecutor@192.168.1.3:6105115/04/30 17:49:25 
 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 9 is 140 
 bytes15/04/30 17:49:25 ERROR TaskResultGetter: Exception while getting task 
 resultcom.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
 Serialization trace:
 relations (com.opsdatastore.model.ObjectDetails)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
 at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at 
 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
 at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
 at 
 org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
 at 
 org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
 at 
 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.NullPointerException
 at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200)
 at 
 com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
 at 
 com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
 at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
 ... 17 more15/04/30 17:49:25 INFO TaskSchedulerImpl: Removed TaskSet 20.0, 
 whose tasks have all completed, from pool

 Basic code looks like this.

 Register the class with Kryo as follows:

 val sc = new SparkConf(true)
   .set(spark.streaming.unpersist, true)
   .setAppName(StreamingKafkaConsumer)
   .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)

 // register all related AVRO generated classes
 sc.registerKryoClasses(Array(
 classOf[ConfigurationProperty],
 classOf[Event],
 classOf[Identifier],
 classOf[Metric],
 classOf[ObjectDetails],
 classOf[Relation],
 classOf[RelationProperty]
 ))

 Use the receiver based approach to consume messages from Kafka:

  val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], 
 DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics, storageLevel)

 Now process the received messages:

 val raw = messages.map(_._2)
 val dStream = raw.map(
   byte = {
 // Avro Decoder
 println(Byte length:  + byte.length)
 val decoder = new AvroDecoder[ObjectDetails](schema = 
 ObjectDetails.getClassSchema)
 val message = decoder.fromBytes(byte)
 println(sAvroMessage : Type : ${message.getType}, Payload : $message)
 message
   }
 )

 When i look in the logs of the workers, in standard out i can se the
 messages being printed, in fact I’m even able to access the Type field with
 out issue:

 Byte length: 315
 AvroMessage : Type : Storage, Payload : {name: Storage 1, type: 
 Storage, vendor: 6274g51cbkmkqisk, model: lk95hqk9m10btaot, 
 timestamp: 1430428565141, identifiers: {ID: {name: ID, value: 
 Storage-1}}, configuration: null, metrics: {Disk Space Usage (GB): 
 {name: Disk Space Usage 

Spark Streaming Kafka Avro NPE on deserialization of payload

2015-05-01 Thread Todd Nist
*Resending as I do not see that this made it to the mailing list, sorry if
in fact it did an is just nor reflected online yet.*

I’m very perplexed with the following. I have a set of AVRO generated
objects that are sent to a SparkStreaming job via Kafka. The SparkStreaming
job follows the receiver-based approach. I am encountering the below error
when I attempt to de serialize the payload:

15/04/30 17:49:25 INFO MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 9 to
sparkExecutor@192.168.1.3:6105115/04/30 17:49:25 INFO
MapOutputTrackerMaster: Size of output statuses for shuffle 9 is 140
bytes15/04/30 17:49:25 ERROR TaskResultGetter: Exception while getting
task resultcom.esotericsoftware.kryo.KryoException:
java.lang.NullPointerException
Serialization trace:
relations (com.opsdatastore.model.ObjectDetails)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
at 
org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
... 17 more15/04/30 17:49:25 INFO TaskSchedulerImpl: Removed TaskSet
20.0, whose tasks have all completed, from pool

Basic code looks like this.

Register the class with Kryo as follows:

val sc = new SparkConf(true)
  .set(spark.streaming.unpersist, true)
  .setAppName(StreamingKafkaConsumer)
  .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)

// register all related AVRO generated classes
sc.registerKryoClasses(Array(
classOf[ConfigurationProperty],
classOf[Event],
classOf[Identifier],
classOf[Metric],
classOf[ObjectDetails],
classOf[Relation],
classOf[RelationProperty]
))

Use the receiver based approach to consume messages from Kafka:

 val messages = KafkaUtils.createStream[Array[Byte], Array[Byte],
DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics,
storageLevel)

Now process the received messages:

val raw = messages.map(_._2)
val dStream = raw.map(
  byte = {
// Avro Decoder
println(Byte length:  + byte.length)
val decoder = new AvroDecoder[ObjectDetails](schema =
ObjectDetails.getClassSchema)
val message = decoder.fromBytes(byte)
println(sAvroMessage : Type : ${message.getType}, Payload : $message)
message
  }
)

When i look in the logs of the workers, in standard out i can se the
messages being printed, in fact I’m even able to access the Type field with
out issue:

Byte length: 315
AvroMessage : Type : Storage, Payload : {name: Storage 1, type:
Storage, vendor: 6274g51cbkmkqisk, model: lk95hqk9m10btaot,
timestamp: 1430428565141, identifiers: {ID: {name: ID,
value: Storage-1}}, configuration: null, metrics: {Disk Space
Usage (GB): {name: Disk Space Usage (GB), source: Generated,
values: {1430428565356: {timestamp: 1430428565356, value:
42.55948347907833}}}, Disk Space Capacity (GB): {name: Disk Space
Capacity (GB), source: Generated, values: {1430428565356:
{timestamp: 1430428565356, value: 38.980024705429095,
relations: [{type: parent, object_type: Virtual Machine,
properties: {ID: {name: ID, value: Virtual Machine-1}}}],

Spark Streaming Kafka Avro NPE on deserialization of payload

2015-04-30 Thread Todd Nist
I’m very perplexed with the following. I have a set of AVRO generated
objects that are sent to a SparkStreaming job via Kafka. The SparkStreaming
job follows the receiver-based approach. I am encountering the below error
when I attempt to de serialize the payload:

15/04/30 17:49:25 INFO MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 9 to
sparkExecutor@192.168.1.3:6105115/04/30 17:49:25 INFO
MapOutputTrackerMaster: Size of output statuses for shuffle 9 is 140
bytes15/04/30 17:49:25 ERROR TaskResultGetter: Exception while getting
task resultcom.esotericsoftware.kryo.KryoException:
java.lang.NullPointerException
Serialization trace:
relations (com.opsdatastore.model.ObjectDetails)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
at 
org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
... 17 more15/04/30 17:49:25 INFO TaskSchedulerImpl: Removed TaskSet
20.0, whose tasks have all completed, from pool

Basic code looks like this.

Register the class with Kryo as follows:

val sc = new SparkConf(true)
  .set(spark.streaming.unpersist, true)
  .setAppName(StreamingKafkaConsumer)
  .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)

// register all related AVRO generated classes
sc.registerKryoClasses(Array(
classOf[ConfigurationProperty],
classOf[Event],
classOf[Identifier],
classOf[Metric],
classOf[ObjectDetails],
classOf[Relation],
classOf[RelationProperty]
))

Use the receiver based approach to consume messages from Kafka:

 val messages = KafkaUtils.createStream[Array[Byte], Array[Byte],
DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics,
storageLevel)

Now process the received messages:

val raw = messages.map(_._2)
val dStream = raw.map(
  byte = {
// Avro Decoder
println(Byte length:  + byte.length)
val decoder = new AvroDecoder[ObjectDetails](schema =
ObjectDetails.getClassSchema)
val message = decoder.fromBytes(byte)
println(sAvroMessage : Type : ${message.getType}, Payload : $message)
message
  }
)

When i look in the logs of the workers, in standard out i can se the
messages being printed, in fact I’m even able to access the Type field with
out issue:

Byte length: 315
AvroMessage : Type : Storage, Payload : {name: Storage 1, type:
Storage, vendor: 6274g51cbkmkqisk, model: lk95hqk9m10btaot,
timestamp: 1430428565141, identifiers: {ID: {name: ID,
value: Storage-1}}, configuration: null, metrics: {Disk Space
Usage (GB): {name: Disk Space Usage (GB), source: Generated,
values: {1430428565356: {timestamp: 1430428565356, value:
42.55948347907833}}}, Disk Space Capacity (GB): {name: Disk Space
Capacity (GB), source: Generated, values: {1430428565356:
{timestamp: 1430428565356, value: 38.980024705429095,
relations: [{type: parent, object_type: Virtual Machine,
properties: {ID: {name: ID, value: Virtual Machine-1}}}],
events: [], components: []}

The ObjectDetails which is generated from AVRO, has a relations field which
is of type java.util.List: