[ 
https://issues.apache.org/jira/browse/FLINK-3617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15626399#comment-15626399
 ] 

Stephan Ewen commented on FLINK-3617:
-------------------------------------

I think the problem is in the {{CaseClassSerializer}}, not in the {{Option}} 
type.
Case classes simply do not support null types at this point. It just happens 
that the field is of type option.
We may want to change that at some point.

The error message should be better, though, agreed.

Using null for an Option value is by itself a bug ;-) (after all, Option is 
explicitly designed to avoid null)

> NPE from CaseClassSerializer when dealing with null Option field
> ----------------------------------------------------------------
>
>                 Key: FLINK-3617
>                 URL: https://issues.apache.org/jira/browse/FLINK-3617
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.0.0
>            Reporter: Jamie Grier
>
> This error occurs when serializing a Scala case class with an field of 
> Option[] type where the value is not Some or None, but null.
> If this is not supported we should have a good error message.
> java.lang.RuntimeException: ConsumerThread threw an exception: null
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:81)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:158)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
>       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
>       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
>       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:107)
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
>       at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:84)
>       at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>       ... 3 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to