This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a5a31de5b30 [FLINK-26368] [kafka] Add setProperty method to 
KafkaSinkBuilder
a5a31de5b30 is described below

commit a5a31de5b3068f7fbc756b44b1674f98f4c04dea
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Apr 4 13:49:35 2022 +0200

    [FLINK-26368] [kafka] Add setProperty method to KafkaSinkBuilder
---
 .../flink/connector/kafka/sink/KafkaSink.java      |  6 ++
 .../connector/kafka/sink/KafkaSinkBuilder.java     | 71 ++++++++--------
 .../connector/kafka/sink/KafkaSinkBuilderTest.java | 96 ++++++++++++++++++++++
 .../kafka/table/KafkaDynamicTableFactoryTest.java  |  6 --
 4 files changed, 135 insertions(+), 44 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
index 008554e77b9..6f74aaed5fa 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
@@ -19,6 +19,7 @@ package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
@@ -129,4 +130,9 @@ public class KafkaSink<IN>
     public SimpleVersionedSerializer<KafkaWriterState> 
getWriterStateSerializer() {
         return new KafkaWriterStateSerializer();
     }
+
+    @VisibleForTesting
+    protected Properties getKafkaProducerConfig() {
+        return kafkaProducerConfig;
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
index e87d4dd1867..62e79547c23 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Properties;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -61,16 +62,30 @@ public class KafkaSinkBuilder<IN> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSinkBuilder.class);
     private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT = 
Duration.ofHours(1);
+    private static final String[] warnKeys =
+            new String[] {
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
+            };
     private static final int MAXIMUM_PREFIX_BYTES = 64000;
 
     private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE;
     private String transactionalIdPrefix = "kafka-sink";
 
-    private Properties kafkaProducerConfig;
+    private final Properties kafkaProducerConfig;
     private KafkaRecordSerializationSchema<IN> recordSerializer;
     private String bootstrapServers;
 
-    KafkaSinkBuilder() {}
+    KafkaSinkBuilder() {
+        kafkaProducerConfig = new Properties();
+        kafkaProducerConfig.put(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProducerConfig.put(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProducerConfig.put(
+                ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
+                (int) DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMillis());
+    }
 
     /**
      * Sets the wanted the {@link DeliveryGuarantee}. The default delivery 
guarantee is {@link
@@ -88,43 +103,26 @@ public class KafkaSinkBuilder<IN> {
      * Sets the configuration which used to instantiate all used {@link
      * org.apache.kafka.clients.producer.KafkaProducer}.
      *
-     * @param kafkaProducerConfig
+     * @param props
      * @return {@link KafkaSinkBuilder}
      */
-    public KafkaSinkBuilder<IN> setKafkaProducerConfig(Properties 
kafkaProducerConfig) {
-        this.kafkaProducerConfig = checkNotNull(kafkaProducerConfig, 
"kafkaProducerConfig");
-        // set the producer configuration properties for kafka record key 
value serializers.
-        if 
(!kafkaProducerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
-            kafkaProducerConfig.put(
-                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                    ByteArraySerializer.class.getName());
-        } else {
-            LOG.warn(
-                    "Overwriting the '{}' is not recommended",
-                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-        }
+    public KafkaSinkBuilder<IN> setKafkaProducerConfig(Properties props) {
+        checkNotNull(props);
+        Arrays.stream(warnKeys)
+                .filter(props::containsKey)
+                .forEach(k -> LOG.warn("Overwriting the '{}' is not 
recommended", k));
 
-        if 
(!kafkaProducerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
 {
-            kafkaProducerConfig.put(
-                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                    ByteArraySerializer.class.getName());
-        } else {
-            LOG.warn(
-                    "Overwriting the '{}' is not recommended",
-                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-        }
+        kafkaProducerConfig.putAll(props);
+        return this;
+    }
 
-        if 
(!kafkaProducerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
-            final long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMillis();
-            checkState(
-                    timeout < Integer.MAX_VALUE && timeout > 0,
-                    "timeout does not fit into 32 bit integer");
-            kafkaProducerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
(int) timeout);
-            LOG.warn(
-                    "Property [{}] not specified. Setting it to {}",
-                    ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
-                    DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
-        }
+    public KafkaSinkBuilder<IN> setProperty(String key, String value) {
+        checkNotNull(key);
+        Arrays.stream(warnKeys)
+                .filter(key::equals)
+                .forEach(k -> LOG.warn("Overwriting the '{}' is not 
recommended", k));
+
+        kafkaProducerConfig.setProperty(key, value);
         return this;
     }
 
@@ -186,9 +184,6 @@ public class KafkaSinkBuilder<IN> {
      * @return {@link KafkaSink}
      */
     public KafkaSink<IN> build() {
-        if (kafkaProducerConfig == null) {
-            setKafkaProducerConfig(new Properties());
-        }
         checkNotNull(bootstrapServers);
         if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
             checkState(
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
new file mode 100644
index 00000000000..2f1f1acac83
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link KafkaSinkBuilder}. */
+public class KafkaSinkBuilderTest extends TestLogger {
+
+    private static final String[] DEFAULT_KEYS =
+            new String[] {
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                ProducerConfig.TRANSACTION_TIMEOUT_CONFIG
+            };
+
+    @Test
+    public void testPropertyHandling() {
+        validateProducerConfig(
+                getBasicBuilder(),
+                p -> {
+                    Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, 
p.containsKey(k)));
+                });
+
+        validateProducerConfig(
+                getBasicBuilder().setProperty("k1", "v1"),
+                p -> {
+                    Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, 
p.containsKey(k)));
+                    p.containsKey("k1");
+                });
+
+        Properties testConf = new Properties();
+        testConf.put("k1", "v1");
+        testConf.put("k2", "v2");
+
+        validateProducerConfig(
+                getBasicBuilder().setKafkaProducerConfig(testConf),
+                p -> {
+                    Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, 
p.containsKey(k)));
+                    testConf.forEach((k, v) -> assertEquals(v, p.get(k)));
+                });
+
+        validateProducerConfig(
+                getBasicBuilder()
+                        .setProperty("k1", "incorrect")
+                        .setKafkaProducerConfig(testConf)
+                        .setProperty("k2", "correct"),
+                p -> {
+                    Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, 
p.containsKey(k)));
+                    assertEquals("v1", p.get("k1"));
+                    assertEquals("correct", p.get("k2"));
+                });
+    }
+
+    private void validateProducerConfig(
+            KafkaSinkBuilder<?> builder, Consumer<Properties> validator) {
+        validator.accept(builder.build().getKafkaProducerConfig());
+    }
+
+    private KafkaSinkBuilder<String> getBasicBuilder() {
+        return new KafkaSinkBuilder<String>()
+                .setBootstrapServers("testServer")
+                .setRecordSerializer(
+                        KafkaRecordSerializationSchema.builder()
+                                .setTopic("topic")
+                                .setValueSerializationSchema(new 
SimpleStringSchema())
+                                .build());
+    }
+}
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 2c2e2fe1fde..4b0a863b5fe 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -144,12 +144,6 @@ public class KafkaDynamicTableFactoryTest {
         KAFKA_SINK_PROPERTIES.setProperty("bootstrap.servers", "dummy");
 
         KAFKA_FINAL_SINK_PROPERTIES.putAll(KAFKA_SINK_PROPERTIES);
-        KAFKA_FINAL_SINK_PROPERTIES.setProperty(
-                "value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        KAFKA_FINAL_SINK_PROPERTIES.setProperty(
-                "key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        KAFKA_FINAL_SINK_PROPERTIES.put("transaction.timeout.ms", 3600000);
-
         KAFKA_FINAL_SOURCE_PROPERTIES.putAll(KAFKA_SOURCE_PROPERTIES);
     }
 

Reply via email to