[ 
https://issues.apache.org/jira/browse/FLINK-21691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298091#comment-17298091
 ] 

Till Rohrmann commented on FLINK-21691:
---------------------------------------

cc [~jqin]

> KafkaSource fails with NPE when setting it up
> ---------------------------------------------
>
>                 Key: FLINK-21691
>                 URL: https://issues.apache.org/jira/browse/FLINK-21691
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.12.2, 1.13.0
>            Reporter: Till Rohrmann
>            Priority: Critical
>             Fix For: 1.13.0, 1.12.3
>
>
> A user reported that the new {{KafkaSource}} fails with a 
> {{NullPointerException}}:
> {code}
> Exception in thread "main" java.lang.NullPointerException
> at 
> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
> at 
> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
> {code}
> when setting it up like this:
> {code}
> val kafkaSource = buildKafkaSource(params)
> val datastream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "kafka")
> private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
>     val builder = KafkaSource.builder<String>()
>         .setBootstrapServers(params.get("bootstrapServers"))
>         .setGroupId(params.get("groupId"))
>         .setStartingOffsets(OffsetsInitializer.earliest())
>         .setTopics("topic")
>         
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
>     if (params.getBoolean("boundedSource", false)) {
>         builder.setBounded(OffsetsInitializer.latest())
>     }
>     return builder.build()
> }
> {code}
> The problem seems to be that the {{ValueDeserializerWrapper}} does not set 
> the deserializer the deserialize method is called, but {{getProducedType}} is 
> actually called first resulting in the {{NullPointerException}}.
> https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to