[
https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dong Lin updated FLINK-21160:
-----------------------------
Description:
The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be
instantiated until method {{deserialize()}} is invoked in runtime, so in the
job compiling stage when invoking {{getProducedType()}}, NPE will be thrown
because of referencing the uninstantiated variable {{deserializer}}.
A user reported that the new {{KafkaSource}} fails with a
{{NullPointerException}}:
{code}
Exception in thread "main" java.lang.NullPointerException
at
org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at
org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
{code}
when setting it up like this:
{code}
val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),
"kafka")
private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
val builder = KafkaSource.builder<String>()
.setBootstrapServers(params.get("bootstrapServers"))
.setGroupId(params.get("groupId"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("topic")
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
if (params.getBoolean("boundedSource", false)) {
builder.setBounded(OffsetsInitializer.latest())
}
return builder.build()
}
{code}
The problem seems to be that the {{ValueDeserializerWrapper}} does not set the
deserializer the deserialize method is called, but {{getProducedType}} is
actually called first resulting in the {{NullPointerException}}.
https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E
was:
The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be
instantiated until method {{deserialize()}} is invoked in runtime, so in the
job compiling stage when invoking {{getProducedType()}}, NPE will be thrown
because of referencing the uninstantiated variable {{deserializer}}.
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=14364&view=logs&j=72d4811f-9f0d-5fd0-014a-0bc26b72b642&t=c1d93a6a-ba91-515d-3196-2ee8019fbda7
A user reported that the new {{KafkaSource}} fails with a
{{NullPointerException}}:
{code}
Exception in thread "main" java.lang.NullPointerException
at
org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at
org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
{code}
when setting it up like this:
{code}
val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),
"kafka")
private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
val builder = KafkaSource.builder<String>()
.setBootstrapServers(params.get("bootstrapServers"))
.setGroupId(params.get("groupId"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("topic")
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
if (params.getBoolean("boundedSource", false)) {
builder.setBounded(OffsetsInitializer.latest())
}
return builder.build()
}
{code}
The problem seems to be that the {{ValueDeserializerWrapper}} does not set the
deserializer the deserialize method is called, but {{getProducedType}} is
actually called first resulting in the {{NullPointerException}}.
https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E
> ValueDeserializerWrapper throws NullPointerException when getProducedType is
> invoked
> ------------------------------------------------------------------------------------
>
> Key: FLINK-21160
> URL: https://issues.apache.org/jira/browse/FLINK-21160
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Reporter: Qingsheng Ren
> Priority: Major
> Labels: pull-request-available
>
> The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be
> instantiated until method {{deserialize()}} is invoked in runtime, so in the
> job compiling stage when invoking {{getProducedType()}}, NPE will be thrown
> because of referencing the uninstantiated variable {{deserializer}}.
> A user reported that the new {{KafkaSource}} fails with a
> {{NullPointerException}}:
> {code}
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
> at
> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
> {code}
> when setting it up like this:
> {code}
> val kafkaSource = buildKafkaSource(params)
> val datastream = env.fromSource(kafkaSource,
> WatermarkStrategy.noWatermarks(), "kafka")
> private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
> val builder = KafkaSource.builder<String>()
> .setBootstrapServers(params.get("bootstrapServers"))
> .setGroupId(params.get("groupId"))
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setTopics("topic")
>
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
> if (params.getBoolean("boundedSource", false)) {
> builder.setBounded(OffsetsInitializer.latest())
> }
> return builder.build()
> }
> {code}
> The problem seems to be that the {{ValueDeserializerWrapper}} does not set
> the deserializer the deserialize method is called, but {{getProducedType}} is
> actually called first resulting in the {{NullPointerException}}.
> https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E
--
This message was sent by Atlassian Jira
(v8.3.4#803005)