This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch release-1.15.2.4-acs
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2258c4a668d18e2783a1da047ffe486f8659bac6
Author: Gyula Fora <[email protected]>
AuthorDate: Fri Feb 25 12:33:03 2022 +0100

    [backport][FLINK-26368] Add setProperty method to KafkaSinkBuilder
---
 .../flink/connector/kafka/sink/KafkaSink.java      |  6 ++
 .../connector/kafka/sink/KafkaSinkBuilder.java     | 72 ++++++++++------------
 .../connector/kafka/sink/KafkaSinkBuilderTest.java | 65 +++++++++++++++++++
 .../kafka/table/KafkaDynamicTableFactoryTest.java  |  6 --
 4 files changed, 105 insertions(+), 44 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
index 008554e77b9..6f74aaed5fa 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
@@ -19,6 +19,7 @@ package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
@@ -129,4 +130,9 @@ public class KafkaSink<IN>
     public SimpleVersionedSerializer<KafkaWriterState> 
getWriterStateSerializer() {
         return new KafkaWriterStateSerializer();
     }
+
+    @VisibleForTesting
+    protected Properties getKafkaProducerConfig() {
+        return kafkaProducerConfig;
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
index 47830505d1d..14e2b70385f 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Properties;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -61,16 +62,30 @@ public class KafkaSinkBuilder<IN> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSinkBuilder.class);
     private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT = 
Duration.ofHours(1);
+    private static final String[] warnKeys =
+            new String[] {
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
+            };
     private static final int MAXIMUM_PREFIX_BYTES = 64000;
 
     private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE;
     private String transactionalIdPrefix = "kafka-sink";
 
-    private Properties kafkaProducerConfig;
+    private final Properties kafkaProducerConfig;
     private KafkaRecordSerializationSchema<IN> recordSerializer;
     private String bootstrapServers;
 
-    KafkaSinkBuilder() {}
+    KafkaSinkBuilder() {
+        kafkaProducerConfig = new Properties();
+        kafkaProducerConfig.put(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProducerConfig.put(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProducerConfig.put(
+                ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
+                (int) DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMillis());
+    }
 
     /**
      * Sets the wanted the {@link DeliveryGuarantee}. The default delivery 
guarantee is {@link
@@ -88,43 +103,26 @@ public class KafkaSinkBuilder<IN> {
      * Sets the configuration which used to instantiate all used {@link
      * org.apache.kafka.clients.producer.KafkaProducer}.
      *
-     * @param kafkaProducerConfig
+     * @param props
      * @return {@link KafkaSinkBuilder}
      */
-    public KafkaSinkBuilder<IN> setKafkaProducerConfig(Properties 
kafkaProducerConfig) {
-        this.kafkaProducerConfig = checkNotNull(kafkaProducerConfig, 
"kafkaProducerConfig");
-        // set the producer configuration properties for kafka record key 
value serializers.
-        if 
(!kafkaProducerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
-            kafkaProducerConfig.put(
-                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                    ByteArraySerializer.class.getName());
-        } else {
-            LOG.warn(
-                    "Overwriting the '{}' is not recommended",
-                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-        }
+    public KafkaSinkBuilder<IN> setKafkaProducerConfig(Properties props) {
+        checkNotNull(props);
+        Arrays.stream(warnKeys)
+                .filter(props::containsKey)
+                .forEach(k -> LOG.warn("Overwriting the '{}' is not 
recommended", k));
 
-        if 
(!kafkaProducerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
 {
-            kafkaProducerConfig.put(
-                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                    ByteArraySerializer.class.getName());
-        } else {
-            LOG.warn(
-                    "Overwriting the '{}' is not recommended",
-                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-        }
+        kafkaProducerConfig.putAll(props);
+        return this;
+    }
 
-        if 
(!kafkaProducerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
-            final long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMillis();
-            checkState(
-                    timeout < Integer.MAX_VALUE && timeout > 0,
-                    "timeout does not fit into 32 bit integer");
-            kafkaProducerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
(int) timeout);
-            LOG.warn(
-                    "Property [{}] not specified. Setting it to {}",
-                    ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
-                    DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
-        }
+    public KafkaSinkBuilder<IN> setProperty(String key, String value) {
+        checkNotNull(key);
+        Arrays.stream(warnKeys)
+                .filter(key::equals)
+                .forEach(k -> LOG.warn("Overwriting the '{}' is not 
recommended", k));
+
+        kafkaProducerConfig.setProperty(key, value);
         return this;
     }
 
@@ -181,9 +179,6 @@ public class KafkaSinkBuilder<IN> {
     }
 
     private void sanityCheck() {
-        if (kafkaProducerConfig == null) {
-            setKafkaProducerConfig(new Properties());
-        }
         if (bootstrapServers != null) {
             kafkaProducerConfig.setProperty(
                     ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
@@ -191,6 +186,7 @@ public class KafkaSinkBuilder<IN> {
         checkNotNull(
                 
kafkaProducerConfig.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
                 "bootstrapServers");
+
         if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
             checkState(
                     transactionalIdPrefix != null,
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
index f83909afc2b..062934711f5 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
@@ -24,14 +24,26 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.util.Arrays;
 import java.util.Properties;
+import java.util.function.Consumer;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
 /** Tests for {@link KafkaSinkBuilder}. */
 @ExtendWith(TestLoggerExtension.class)
 public class KafkaSinkBuilderTest {
 
+    private static final String[] DEFAULT_KEYS =
+            new String[] {
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                ProducerConfig.TRANSACTION_TIMEOUT_CONFIG
+            };
+
     @Test
     public void testBootstrapServerSettingWithProperties() {
         Properties testConf = new Properties();
@@ -47,4 +59,57 @@ public class KafkaSinkBuilderTest {
 
         assertDoesNotThrow(builder::build);
     }
+
+    @Test
+    public void testPropertyHandling() {
+        validateProducerConfig(
+                getBasicBuilder(),
+                p -> {
+                    Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, 
p.containsKey(k)));
+                });
+
+        validateProducerConfig(
+                getBasicBuilder().setProperty("k1", "v1"),
+                p -> {
+                    Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, 
p.containsKey(k)));
+                    p.containsKey("k1");
+                });
+
+        Properties testConf = new Properties();
+        testConf.put("k1", "v1");
+        testConf.put("k2", "v2");
+
+        validateProducerConfig(
+                getBasicBuilder().setKafkaProducerConfig(testConf),
+                p -> {
+                    Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, 
p.containsKey(k)));
+                    testConf.forEach((k, v) -> assertEquals(v, p.get(k)));
+                });
+
+        validateProducerConfig(
+                getBasicBuilder()
+                        .setProperty("k1", "incorrect")
+                        .setKafkaProducerConfig(testConf)
+                        .setProperty("k2", "correct"),
+                p -> {
+                    Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, 
p.containsKey(k)));
+                    assertEquals("v1", p.get("k1"));
+                    assertEquals("correct", p.get("k2"));
+                });
+    }
+
+    private void validateProducerConfig(
+            KafkaSinkBuilder<?> builder, Consumer<Properties> validator) {
+        validator.accept(builder.build().getKafkaProducerConfig());
+    }
+
+    private KafkaSinkBuilder<String> getBasicBuilder() {
+        return new KafkaSinkBuilder<String>()
+                .setBootstrapServers("testServer")
+                .setRecordSerializer(
+                        KafkaRecordSerializationSchema.builder()
+                                .setTopic("topic")
+                                .setValueSerializationSchema(new 
SimpleStringSchema())
+                                .build());
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 1c81457fc7a..6673182ef7b 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -144,12 +144,6 @@ public class KafkaDynamicTableFactoryTest {
         KAFKA_SINK_PROPERTIES.setProperty("bootstrap.servers", "dummy");
 
         KAFKA_FINAL_SINK_PROPERTIES.putAll(KAFKA_SINK_PROPERTIES);
-        KAFKA_FINAL_SINK_PROPERTIES.setProperty(
-                "value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        KAFKA_FINAL_SINK_PROPERTIES.setProperty(
-                "key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        KAFKA_FINAL_SINK_PROPERTIES.put("transaction.timeout.ms", 3600000);
-
         KAFKA_FINAL_SOURCE_PROPERTIES.putAll(KAFKA_SOURCE_PROPERTIES);
     }
 

Reply via email to