This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 3d216f963ec0ce921af97143faed2cf484152753 Author: Salva Alcántara <salcantara...@gmail.com> AuthorDate: Mon Oct 31 16:57:20 2022 +0100 [FLINK-29480][kafka] Skip null ProduceRecord when writing out in KafkaWriter This closes #21186. Co-authored-by: Leonard Xu <leon...@apache.org> --- .../kafka/sink/KafkaRecordSerializationSchema.java | 5 ++++- .../org/apache/flink/connector/kafka/sink/KafkaWriter.java | 8 +++++--- .../flink/connector/kafka/sink/KafkaWriterITCase.java | 13 +++++++++++++ 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java index efbfe88..9d081c7 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; +import javax.annotation.Nullable; + import java.io.Serializable; /** @@ -54,8 +56,9 @@ public interface KafkaRecordSerializationSchema<T> extends Serializable { * @param element element to be serialized * @param context context to possibly determine target partition * @param timestamp timestamp - * @return Kafka {@link ProducerRecord} + * @return Kafka {@link ProducerRecord} or null if the given element cannot be serialized */ + @Nullable ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp); /** Context providing information of the kafka record target location. */ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index 53e48d9..ba2cb4e 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -191,11 +191,13 @@ class KafkaWriter<IN> } @Override - public void write(IN element, Context context) throws IOException { + public void write(@Nullable IN element, Context context) throws IOException { final ProducerRecord<byte[], byte[]> record = recordSerializer.serialize(element, kafkaSinkContext, context.timestamp()); - currentProducer.send(record, deliveryCallback); - numRecordsOutCounter.inc(); + if (record != null) { + currentProducer.send(record, deliveryCallback); + numRecordsOutCounter.inc(); + } } @Override diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index 01f708c..8dfa5f6 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -148,6 +148,15 @@ public class KafkaWriterITCase { assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); + // elements for which the serializer returns null should be silently skipped + writer.write(null, SINK_WRITER_CONTEXT); + timeService.trigger(); + assertThat(numBytesOut.getCount()).isEqualTo(0L); + assertThat(numRecordsOut.getCount()).isEqualTo(0); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); + assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); + + // but elements for which a non-null producer record is returned should count writer.write(1, SINK_WRITER_CONTEXT); timeService.trigger(); assertThat(numRecordsOut.getCount()).isEqualTo(1); @@ -491,6 +500,10 @@ public class KafkaWriterITCase { @Override public ProducerRecord<byte[], byte[]> serialize( Integer element, KafkaSinkContext context, Long timestamp) { + if (element == null) { + // in general, serializers should be allowed to skip invalid elements + return null; + } return new ProducerRecord<>(topic, ByteBuffer.allocate(4).putInt(element).array()); } }