[ https://issues.apache.org/jira/browse/FLINK-29480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625817#comment-17625817 ]
Salva commented on FLINK-29480: ------------------------------- Can someone provide any guidance on how to fix those errors? > Skip invalid messages when writing > ---------------------------------- > > Key: FLINK-29480 > URL: https://issues.apache.org/jira/browse/FLINK-29480 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Reporter: Salva > Assignee: Salva > Priority: Minor > Labels: pull-request-available > Attachments: Screenshot 2022-10-28 at 13.48.12.png > > > As reported in [1], it seems that it's not possible to skip invalid messages > when writing. More specifically, if there is an error serializing messages, > there is no option for skipping them and then Flink job enters a crash loop. > In particular, the `write` method of the `KafkaWriter` looks like this: > {code:java} > @Override > public void write(IN element, Context context) throws IOException { > final ProducerRecord<byte[], byte[]> record = > recordSerializer.serialize(element, ...); > currentProducer.send(record, deliveryCallback); // line 200 > numRecordsSendCounter.inc(); > } {code} > So, If you make your `serialize` method return `null`, this is what you get > at runtime > {code:java} > java.lang.NullPointerException at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) > at > org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) > {code} > What I propose is to modify the KafkaWriter [2, 3] like this: > {code:java} > @Override > public void write(IN element, Context context) throws IOException { > final ProducerRecord<byte[], byte[]> record = > recordSerializer.serialize(element, ...); > if (record != null) { // skip null records (check to be added) > currentProducer.send(record, deliveryCallback); > numRecordsSendCounter.inc(); > } > } {code} > In order to at least give a chance of skipping those messages and move on to > the next ones. > Obviously, one could prepend the sink with a flatMap operator for filtering > out invalid messages, but > # It looks weird that one has to prepend an operator for "making sure" that > the serializer will not fail right after. Wouldn't it be simpler to skip the > null records directly in order to avoid this pre-check? [4] > # It's such a simple change (apparently) > # Brings consistency/symmetry with the reading case [4, 5] > To expand on point 3, by looking at `KafkaDeserializationSchema`: > {code:java} > T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception; > default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> > out) throws Exception { > T deserialized = deserialize(message); > if (deserialized != null) { // skip null records (check already exists) > out.collect(deserialized); > } > } {code} > one can simply return `null` in the overriden `deserialize` method in order > to skip any message that fails to be deserialized. Similarly, if one uses the > `KafkaRecordDeserializationSchema` interface instead: > {code:java} > void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out) > throws IOException {code} > then it's also possible not to invoke `out.collect(...)` on null records. To > me, it looks strange that the same flexibility is not given in the writing > case. > *References* > [1] [https://lists.apache.org/thread/ykmy4llovrrrzlvz0ng3x5yosskjg70h] > [2] > [https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143] > > [3] > [https://github.com/apache/flink/blob/f0fe85a50920da2b7d7da815db0a924940522e28/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197] > > [4] [https://lists.apache.org/thread/pllv5dqq27xkvj6p3lj91vcz409pw38d] > [5] > [https://stackoverflow.com/questions/55538736/how-to-skip-corrupted-messages-in-flink] > -- This message was sent by Atlassian Jira (v8.20.10#820010)