[
https://issues.apache.org/jira/browse/FLINK-16735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17065809#comment-17065809
]
Shangwen Tang edited comment on FLINK-16735 at 3/24/20, 2:16 PM:
-----------------------------------------------------------------
When kafkaSchema.serialize return a null record, a null pointer exception
occurs when Kafka Client obtains the topic, so we need to check whether the
record is null
{code:java}
// FlinkKafkaProducer.java
public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN
next, Context context) throws FlinkKafkaException {
...
record = kafkaSchema.serialize(next, context.timestamp());
} else {
throw new RuntimeException(
"We have neither KafkaSerializationSchema nor KeyedSerializationSchema,
this" +
"is a bug.");
}
pendingRecords.incrementAndGet();
transaction.producer.send(record, callback);
}
{code}
was (Author: tangshangwen):
When kafkaSchema.serialize return a null record, a null pointer exception
occurs when Kafka Client obtains the topic, so we need to check whether the
record is null
{code:java}
// FlinkKafkaProducer.java
public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN
next, Context context) throws FlinkKafkaException {
...
record = kafkaSchema.serialize(next, context.timestamp());
} else {
throw new RuntimeException(
"We have neither KafkaSerializationSchema nor KeyedSerializationSchema,
this" +
"is a bug.");
}
pendingRecords.incrementAndGet();
transaction.producer.send(record, callback);
}
{code}
> FlinkKafkaProducer should check that it is not null before sending a record
> ---------------------------------------------------------------------------
>
> Key: FLINK-16735
> URL: https://issues.apache.org/jira/browse/FLINK-16735
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.10.0
> Reporter: Shangwen Tang
> Assignee: Shangwen Tang
> Priority: Major
> Attachments: image-2020-03-24-11-40-22-143.png
>
>
> In our user scenario, some users implemented the KafkaSerializationSchema and
> sometimes returned a null record, resulting in a null pointer exception
> !image-2020-03-24-11-40-22-143.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)