*Resending as I do not see that this made it to the mailing list, sorry if
in fact it did an is just nor reflected online yet.*

I’m very perplexed with the following. I have a set of AVRO generated
objects that are sent to a SparkStreaming job via Kafka. The SparkStreaming
job follows the receiver-based approach. I am encountering the below error
when I attempt to de serialize the payload:

15/04/30 17:49:25 INFO MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 9 to
sparkExecutor@192.168.1.3:6105115/04/30 17:49:25 INFO
MapOutputTrackerMaster: Size of output statuses for shuffle 9 is 140
bytes15/04/30 17:49:25 ERROR TaskResultGetter: Exception while getting
task resultcom.esotericsoftware.kryo.KryoException:
java.lang.NullPointerException
Serialization trace:
relations (com.opsdatastore.model.ObjectDetails)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
at 
org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
... 17 more15/04/30 17:49:25 INFO TaskSchedulerImpl: Removed TaskSet
20.0, whose tasks have all completed, from pool

Basic code looks like this.

Register the class with Kryo as follows:

val sc = new SparkConf(true)
  .set("spark.streaming.unpersist", "true")
  .setAppName("StreamingKafkaConsumer")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

// register all related AVRO generated classes
sc.registerKryoClasses(Array(
    classOf[ConfigurationProperty],
    classOf[Event],
    classOf[Identifier],
    classOf[Metric],
    classOf[ObjectDetails],
    classOf[Relation],
    classOf[RelationProperty]
    ))

Use the receiver based approach to consume messages from Kafka:

 val messages = KafkaUtils.createStream[Array[Byte], Array[Byte],
DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics,
storageLevel)

Now process the received messages:

val raw = messages.map(_._2)
val dStream = raw.map(
  byte => {
    // Avro Decoder
    println("Byte length: " + byte.length)
    val decoder = new AvroDecoder[ObjectDetails](schema =
ObjectDetails.getClassSchema)
    val message = decoder.fromBytes(byte)
    println(s"AvroMessage : Type : ${message.getType}, Payload : $message")
    message
  }
)

When i look in the logs of the workers, in standard out i can se the
messages being printed, in fact I’m even able to access the Type field with
out issue:

Byte length: 315
AvroMessage : Type : Storage, Payload : {"name": "Storage 1", "type":
"Storage", "vendor": "6274g51cbkmkqisk", "model": "lk95hqk9m10btaot",
"timestamp": 1430428565141, "identifiers": {"ID": {"name": "ID",
"value": "Storage-1"}}, "configuration": null, "metrics": {"Disk Space
Usage (GB)": {"name": "Disk Space Usage (GB)", "source": "Generated",
"values": {"1430428565356": {"timestamp": 1430428565356, "value":
42.55948347907833}}}, "Disk Space Capacity (GB)": {"name": "Disk Space
Capacity (GB)", "source": "Generated", "values": {"1430428565356":
{"timestamp": 1430428565356, "value": 38.980024705429095}}}},
"relations": [{"type": "parent", "object_type": "Virtual Machine",
"properties": {"ID": {"name": "ID", "value": "Virtual Machine-1"}}}],
"events": [], "components": []}

The ObjectDetails which is generated from AVRO, has a relations field which
is of type java.util.List:

 /**
   * Gets the value of the 'relations' field.
   */
  public java.util.List<com.opsdatastore.model.Relation> getRelations() {
    return relations;
  }

So I’m at a loss at the moment as to what is causing the above NPE and
exactly how to address this. I’m even more confused as I appear to be able
to access the deserialized message within the raw.map(...) as shown above.

Is there something special I need to do for the List? Am I missing
something obvious here?  If so an example would be appreciated.

TIA for the assistance as I am stumped at the moment.

-Todd

Reply via email to