[ 
https://issues.apache.org/jira/browse/FLINK-29480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625817#comment-17625817
 ] 

Salva commented on FLINK-29480:
-------------------------------

Can someone provide any guidance on how to fix those errors?

> Skip invalid messages when writing
> ----------------------------------
>
>                 Key: FLINK-29480
>                 URL: https://issues.apache.org/jira/browse/FLINK-29480
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>            Reporter: Salva
>            Assignee: Salva
>            Priority: Minor
>              Labels: pull-request-available
>         Attachments: Screenshot 2022-10-28 at 13.48.12.png
>
>
> As reported in [1], it seems that it's not possible to skip invalid messages 
> when writing. More specifically, if there is an error serializing messages, 
> there is no option for skipping them and then Flink job enters a crash loop. 
> In particular, the `write` method of the `KafkaWriter` looks like this:
> {code:java}
> @Override
> public void write(IN element, Context context) throws IOException {
>   final ProducerRecord<byte[], byte[]> record = 
> recordSerializer.serialize(element, ...);
>   currentProducer.send(record, deliveryCallback); // line 200
>   numRecordsSendCounter.inc();
> } {code}
> So, If you make your `serialize` method return `null`, this is what you get 
> at runtime
> {code:java}
> java.lang.NullPointerException at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) 
> at 
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) 
> at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
>   {code}
> What I propose is to modify the KafkaWriter [2, 3] like this:
> {code:java}
> @Override
> public void write(IN element, Context context) throws IOException {
>   final ProducerRecord<byte[], byte[]> record = 
> recordSerializer.serialize(element, ...);
>   if (record != null) { // skip null records (check to be added)
>     currentProducer.send(record, deliveryCallback);
>     numRecordsSendCounter.inc();
>   }
> } {code}
> In order to at least give a chance of skipping those messages and move on to 
> the next ones.
> Obviously, one could prepend the sink with a flatMap operator for filtering 
> out invalid messages, but
>  # It looks weird that one has to prepend an operator for "making sure" that 
> the serializer will not fail right after. Wouldn't it be simpler to skip the 
> null records directly in order to avoid this pre-check? [4]
>  # It's such a simple change (apparently)
>  # Brings consistency/symmetry with the reading case [4, 5]
> To expand on point 3, by looking at `KafkaDeserializationSchema`:
> {code:java}
> T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
> default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> 
> out) throws Exception {
>   T deserialized = deserialize(message);
>   if (deserialized != null) { // skip null records (check already exists)
>     out.collect(deserialized);
>   }
> }  {code}
> one can simply return `null` in the overriden `deserialize` method in order 
> to skip any message that fails to be deserialized. Similarly, if one uses the 
> `KafkaRecordDeserializationSchema` interface instead:
> {code:java}
> void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out) 
> throws IOException {code}
> then it's also possible not to invoke `out.collect(...)` on null records. To 
> me, it looks strange that the same flexibility is not given in the writing 
> case.
> *References*
> [1] [https://lists.apache.org/thread/ykmy4llovrrrzlvz0ng3x5yosskjg70h]
> [2] 
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143]
>  
> [3] 
> [https://github.com/apache/flink/blob/f0fe85a50920da2b7d7da815db0a924940522e28/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197]
>  
> [4] [https://lists.apache.org/thread/pllv5dqq27xkvj6p3lj91vcz409pw38d] 
> [5] 
> [https://stackoverflow.com/questions/55538736/how-to-skip-corrupted-messages-in-flink]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to