[
https://issues.apache.org/jira/browse/FLINK-21691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298091#comment-17298091
]
Till Rohrmann commented on FLINK-21691:
---------------------------------------
cc [~jqin]
> KafkaSource fails with NPE when setting it up
> ---------------------------------------------
>
> Key: FLINK-21691
> URL: https://issues.apache.org/jira/browse/FLINK-21691
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.12.2, 1.13.0
> Reporter: Till Rohrmann
> Priority: Critical
> Fix For: 1.13.0, 1.12.3
>
>
> A user reported that the new {{KafkaSource}} fails with a
> {{NullPointerException}}:
> {code}
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
> at
> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
> {code}
> when setting it up like this:
> {code}
> val kafkaSource = buildKafkaSource(params)
> val datastream = env.fromSource(kafkaSource,
> WatermarkStrategy.noWatermarks(), "kafka")
> private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
> val builder = KafkaSource.builder<String>()
> .setBootstrapServers(params.get("bootstrapServers"))
> .setGroupId(params.get("groupId"))
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setTopics("topic")
>
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
> if (params.getBoolean("boundedSource", false)) {
> builder.setBounded(OffsetsInitializer.latest())
> }
> return builder.build()
> }
> {code}
> The problem seems to be that the {{ValueDeserializerWrapper}} does not set
> the deserializer the deserialize method is called, but {{getProducedType}} is
> actually called first resulting in the {{NullPointerException}}.
> https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E
--
This message was sent by Atlassian Jira
(v8.3.4#803005)