[
https://issues.apache.org/jira/browse/FLINK-29480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Salva updated FLINK-29480:
--------------------------
Description:
As reported in [1], it seems that it's not possible to skip invalid messages
when writing. More specifically, if there is an error serializing messages,
there is no option for skipping them and then Flink job enters a crash loop. In
particular, the `write` method of the `KafkaWriter` looks like this:
{code:java}
@Override
public void write(IN element, Context context) throws IOException {
final ProducerRecord<byte[], byte[]> record =
recordSerializer.serialize(element, ...);
currentProducer.send(record, deliveryCallback); // line 200
numRecordsSendCounter.inc();
} {code}
So, If you make your `serialize` method return `null`, this is what you get at
runtime
{code:java}
java.lang.NullPointerException at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
at
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
{code}
What I propose is to modify the KafkaWriter [2, 3] like this:
{code:java}
@Override
public void write(IN element, Context context) throws IOException {
final ProducerRecord<byte[], byte[]> record =
recordSerializer.serialize(element, ...);
if (record != null) { // skip null records (check to be added)
currentProducer.send(record, deliveryCallback);
numRecordsSendCounter.inc();
}
} {code}
In order to at least give a chance of skipping those messages and move on to
the next ones.
Obviously, one could prepend the sink with a flatMap operator for filtering out
invalid messages, but
# It looks weird that one has to prepend an operator for "making sure" that
the serializer will not fail right after. Wouldn't it be simpler to skip the
null records directly in order to avoid this pre-check? [4]
# It's such a simple change (apparently)
# Brings consistency/symmetry with the reading case [4, 5]
To expand on point 3, by looking at `KafkaDeserializationSchema`:
{code:java}
@Override
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception; default
void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out)
throws Exception {
T deserialized = deserialize(message);
if (deserialized != null) { // skip null records (check already exists)
out.collect(deserialized);
}
} {code}
one can simply return `null` in the overriden `deserialize` method in order to
skip any message that fails to be deserialized. Similarly, if one uses the
`KafkaRecordDeserializationSchema` interface instead:
{code:java}
void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out)
throws IOException {code}
then it's also possible not to invoke `out.collect(...)` on null records. To
me, it looks strange that the same flexibility is not given in the writing case.
*References*
[1] [https://lists.apache.org/thread/ykmy4llovrrrzlvz0ng3x5yosskjg70h]
[2]
[https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143]
[3]
[https://github.com/apache/flink/blob/f0fe85a50920da2b7d7da815db0a924940522e28/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197]
[4] [https://lists.apache.org/thread/pllv5dqq27xkvj6p3lj91vcz409pw38d]
[5]
[https://stackoverflow.com/questions/55538736/how-to-skip-corrupted-messages-in-flink]
was:
As reported in [1], it seems that it's not possible to skip invalid messages
when writing. More specifically, if there is an error serializing messages,
there is no option for skipping them and then Flink job enters a crash loop. In
particular, the `write` method of the `KafkaWriter` looks like this:
{code:java}
@Override
public void write(IN element, Context context) throws IOException {
final ProducerRecord<byte[], byte[]> record =
recordSerializer.serialize(element, ...);
currentProducer.send(record, deliveryCallback); // line 200
numRecordsSendCounter.inc();
} {code}
So, If you make your `serialize` method return `null`, this is what you get at
runtime
{code:java}
java.lang.NullPointerException at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
at
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
{code}
What I propose is to modify the KafkaWriter [2, 3] like this:
{code:java}
@Override
public void write(IN element, Context context) throws IOException {
final ProducerRecord<byte[], byte[]> record =
recordSerializer.serialize(element, ...);
if (record != null) { // skip null records (this check exists)
currentProducer.send(record, deliveryCallback);
numRecordsSendCounter.inc();
}
} {code}
In order to at least give a chance of skipping those messages and move on to
the next ones.
Obviously, one could prepend the sink with a flatMap operator for filtering out
invalid messages, but
# It looks weird that one has to prepend an operator for "making sure" that
the serializer will not fail right after. Wouldn't it be simpler to skip the
null records directly in order to avoid this pre-check? [4]
# It's such a simple change (apparently)
# Brings consistency/symmetry with the reading case [4, 5]
To expand on point 3, by looking at `KafkaDeserializationSchema`:
{code:java}
@Override
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception; default
void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out)
throws Exception {
T deserialized = deserialize(message);
if (deserialized != null) { // skip null records
out.collect(deserialized);
}
} {code}
one can simply return `null` in the overriden `deserialize` method in order to
skip any message that fails to be deserialized. Similarly, if one uses the
`KafkaRecordDeserializationSchema` interface instead:
{code:java}
void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out)
throws IOException {code}
then it's also possible not to invoke `out.collect(...)` on null records. To
me, it looks strange that the same flexibility is not given in the writing case.
*References*
[1] [https://lists.apache.org/thread/ykmy4llovrrrzlvz0ng3x5yosskjg70h]
[2]
[https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143]
[3]
[https://github.com/apache/flink/blob/f0fe85a50920da2b7d7da815db0a924940522e28/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197]
[4] [https://lists.apache.org/thread/pllv5dqq27xkvj6p3lj91vcz409pw38d]
[5]
[https://stackoverflow.com/questions/55538736/how-to-skip-corrupted-messages-in-flink]
> Skip invalid messages when writing
> ----------------------------------
>
> Key: FLINK-29480
> URL: https://issues.apache.org/jira/browse/FLINK-29480
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Reporter: Salva
> Priority: Minor
>
> As reported in [1], it seems that it's not possible to skip invalid messages
> when writing. More specifically, if there is an error serializing messages,
> there is no option for skipping them and then Flink job enters a crash loop.
> In particular, the `write` method of the `KafkaWriter` looks like this:
> {code:java}
> @Override
> public void write(IN element, Context context) throws IOException {
> final ProducerRecord<byte[], byte[]> record =
> recordSerializer.serialize(element, ...);
> currentProducer.send(record, deliveryCallback); // line 200
> numRecordsSendCounter.inc();
> } {code}
> So, If you make your `serialize` method return `null`, this is what you get
> at runtime
> {code:java}
> java.lang.NullPointerException at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> at
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> {code}
> What I propose is to modify the KafkaWriter [2, 3] like this:
> {code:java}
> @Override
> public void write(IN element, Context context) throws IOException {
> final ProducerRecord<byte[], byte[]> record =
> recordSerializer.serialize(element, ...);
> if (record != null) { // skip null records (check to be added)
> currentProducer.send(record, deliveryCallback);
> numRecordsSendCounter.inc();
> }
> } {code}
> In order to at least give a chance of skipping those messages and move on to
> the next ones.
> Obviously, one could prepend the sink with a flatMap operator for filtering
> out invalid messages, but
> # It looks weird that one has to prepend an operator for "making sure" that
> the serializer will not fail right after. Wouldn't it be simpler to skip the
> null records directly in order to avoid this pre-check? [4]
> # It's such a simple change (apparently)
> # Brings consistency/symmetry with the reading case [4, 5]
> To expand on point 3, by looking at `KafkaDeserializationSchema`:
> {code:java}
> @Override
> T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
> default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T>
> out) throws Exception {
> T deserialized = deserialize(message);
> if (deserialized != null) { // skip null records (check already exists)
> out.collect(deserialized);
> }
> } {code}
> one can simply return `null` in the overriden `deserialize` method in order
> to skip any message that fails to be deserialized. Similarly, if one uses the
> `KafkaRecordDeserializationSchema` interface instead:
> {code:java}
> void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out)
> throws IOException {code}
> then it's also possible not to invoke `out.collect(...)` on null records. To
> me, it looks strange that the same flexibility is not given in the writing
> case.
> *References*
> [1] [https://lists.apache.org/thread/ykmy4llovrrrzlvz0ng3x5yosskjg70h]
> [2]
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143]
>
> [3]
> [https://github.com/apache/flink/blob/f0fe85a50920da2b7d7da815db0a924940522e28/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197]
>
> [4] [https://lists.apache.org/thread/pllv5dqq27xkvj6p3lj91vcz409pw38d]
> [5]
> [https://stackoverflow.com/questions/55538736/how-to-skip-corrupted-messages-in-flink]
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)