This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 3d216f963ec0ce921af97143faed2cf484152753
Author: Salva Alcántara <salcantara...@gmail.com>
AuthorDate: Mon Oct 31 16:57:20 2022 +0100

    [FLINK-29480][kafka] Skip null ProduceRecord when writing out in KafkaWriter
    
    This closes #21186.
    
    Co-authored-by: Leonard Xu <leon...@apache.org>
---
 .../kafka/sink/KafkaRecordSerializationSchema.java          |  5 ++++-
 .../org/apache/flink/connector/kafka/sink/KafkaWriter.java  |  8 +++++---
 .../flink/connector/kafka/sink/KafkaWriterITCase.java       | 13 +++++++++++++
 3 files changed, 22 insertions(+), 4 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java
index efbfe88..9d081c7 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 
 /**
@@ -54,8 +56,9 @@ public interface KafkaRecordSerializationSchema<T> extends 
Serializable {
      * @param element element to be serialized
      * @param context context to possibly determine target partition
      * @param timestamp timestamp
-     * @return Kafka {@link ProducerRecord}
+     * @return Kafka {@link ProducerRecord} or null if the given element 
cannot be serialized
      */
+    @Nullable
     ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext 
context, Long timestamp);
 
     /** Context providing information of the kafka record target location. */
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 53e48d9..ba2cb4e 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -191,11 +191,13 @@ class KafkaWriter<IN>
     }
 
     @Override
-    public void write(IN element, Context context) throws IOException {
+    public void write(@Nullable IN element, Context context) throws 
IOException {
         final ProducerRecord<byte[], byte[]> record =
                 recordSerializer.serialize(element, kafkaSinkContext, 
context.timestamp());
-        currentProducer.send(record, deliveryCallback);
-        numRecordsOutCounter.inc();
+        if (record != null) {
+            currentProducer.send(record, deliveryCallback);
+            numRecordsOutCounter.inc();
+        }
     }
 
     @Override
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index 01f708c..8dfa5f6 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -148,6 +148,15 @@ public class KafkaWriterITCase {
             assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
             assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+            // elements for which the serializer returns null should be 
silently skipped
+            writer.write(null, SINK_WRITER_CONTEXT);
+            timeService.trigger();
+            assertThat(numBytesOut.getCount()).isEqualTo(0L);
+            assertThat(numRecordsOut.getCount()).isEqualTo(0);
+            assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
+            assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
+
+            // but elements for which a non-null producer record is returned 
should count
             writer.write(1, SINK_WRITER_CONTEXT);
             timeService.trigger();
             assertThat(numRecordsOut.getCount()).isEqualTo(1);
@@ -491,6 +500,10 @@ public class KafkaWriterITCase {
         @Override
         public ProducerRecord<byte[], byte[]> serialize(
                 Integer element, KafkaSinkContext context, Long timestamp) {
+            if (element == null) {
+                // in general, serializers should be allowed to skip invalid 
elements
+                return null;
+            }
             return new ProducerRecord<>(topic, 
ByteBuffer.allocate(4).putInt(element).array());
         }
     }

Reply via email to