[
https://issues.apache.org/jira/browse/SPARK-3601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14519005#comment-14519005
]
Nicolas PHUNG commented on SPARK-3601:
--------------------------------------
For GenericData.Array Avro, I use the following snippet from
[Flink|https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java]:
{code}
// Avoid issue with avro array serialization
https://issues.apache.org/jira/browse/FLINK-1391
public class Serializers {
/**
* Special serializer for Java collections enforcing certain instance types.
* Avro is serializing collections with an "GenericData.Array" type. Kryo
is not able to handle
* this type, so we use ArrayLists.
*/
public static class SpecificInstanceCollectionSerializer<T extends
java.util.ArrayList<?>> extends CollectionSerializer implements Serializable {
private Class<T> type;
public SpecificInstanceCollectionSerializer(Class<T> type) {
this.type = type;
}
@Override
protected Collection create(Kryo kryo, Input input, Class<Collection>
type) {
return kryo.newInstance(this.type);
}
@Override
protected Collection createCopy(Kryo kryo, Collection original) {
return kryo.newInstance(this.type);
}
}
}
{code}
And I have register in Kryo with the following scala code :
{code}
kryo.register(classOf[GenericData.Array[_]], new
SpecificInstanceCollectionSerializer(classOf[java.util.ArrayList[_]]));
{code}
I hope this help.
> Kryo NPE for output operations on Avro complex Objects even after registering.
> ------------------------------------------------------------------------------
>
> Key: SPARK-3601
> URL: https://issues.apache.org/jira/browse/SPARK-3601
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.0.0
> Environment: local, standalone cluster
> Reporter: mohan gaddam
>
> Kryo serializer works well when avro objects has simple data. but when the
> same avro object has complex data(like unions/arrays) kryo fails while output
> operations. but mappings are good. Note that i have registered all the Avro
> generated classes with kryo. Im using Java as programming language.
> when used complex message throws NPE, stack trace as follows:
> ==================================================
> ERROR scheduler.JobScheduler: Error running job streaming job 1411043845000
> ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Exception
> while getting task result: com.esotericsoftware.kryo.KryoException:
> java.lang.NullPointerException
> Serialization trace:
> value (xyz.Datum)
> data (xyz.ResMsg)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> In the above exception, Datum and ResMsg are project specific classes
> generated by avro using the below avdl snippet:
> ======================
> record KeyValueObject {
> union{boolean, int, long, float, double, bytes, string} name;
> union {boolean, int, long, float, double, bytes, string,
> array<union{boolean, int, long, float, double, bytes, string,
> KeyValueObject}>, KeyValueObject} value;
> }
> record Datum {
> union {boolean, int, long, float, double, bytes, string,
> array<union{boolean, int, long, float, double, bytes, string,
> KeyValueObject}>, KeyValueObject} value;
> }
> record ResMsg {
> string version;
> string sequence;
> string resourceGUID;
> string GWID;
> string GWTimestamp;
> union {Datum, array<Datum>} data;
> }
> avro message samples are as follows:
> ============================
> 1)
> {"version": "01", "sequence": "00001", "resourceGUID": "001", "GWID": "002",
> "GWTimestamp": "1409823150737", "data": {"value": "30"}}
> 2)
> {"version": "01", "sequence": "00001", "resource": "sensor-001",
> "controller": "002", "controllerTimestamp": "1411038710358", "data":
> {"value": [ {"name": "Temperature", "value": "30"}, {"name": "Speed",
> "value": "60"}, {"name": "Location", "value": ["+401213.1", "-0750015.1"]},
> {"name": "Timestamp", "value": "2014-09-09T08:15:25-05:00"}]}}
> both 1 and 2 adhere to the avro schema, so decoder is able to convert them
> into avro objects in spark streaming api.
> BTW the messages were pulled from kafka source, and decoded by using kafka
> decoder.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]