This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new c81a0512c7a [FLINK-27174][connector/kafka] Fix checking of 
bootstrapServers when already provided in producer Properties
c81a0512c7a is described below

commit c81a0512c7ade2ec4f2a91e98ec5ee4d09a66d3b
Author: zhangzhengqi3 <[email protected]>
AuthorDate: Fri May 6 20:47:02 2022 +0800

    [FLINK-27174][connector/kafka] Fix checking of bootstrapServers when 
already provided in producer Properties
---
 .../connector/kafka/sink/KafkaSinkBuilder.java     | 31 ++++++++------
 .../connector/kafka/sink/KafkaSinkBuilderTest.java | 50 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 12 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
index e87d4dd1867..47830505d1d 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
@@ -180,26 +180,33 @@ public class KafkaSinkBuilder<IN> {
         return this;
     }
 
-    /**
-     * Constructs the {@link KafkaSink} with the configured properties.
-     *
-     * @return {@link KafkaSink}
-     */
-    public KafkaSink<IN> build() {
+    private void sanityCheck() {
         if (kafkaProducerConfig == null) {
             setKafkaProducerConfig(new Properties());
         }
-        checkNotNull(bootstrapServers);
+        if (bootstrapServers != null) {
+            kafkaProducerConfig.setProperty(
+                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        }
+        checkNotNull(
+                
kafkaProducerConfig.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
+                "bootstrapServers");
         if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
             checkState(
                     transactionalIdPrefix != null,
                     "EXACTLY_ONCE delivery guarantee requires a 
transactionIdPrefix to be set to provide unique transaction names across 
multiple KafkaSinks writing to the same Kafka cluster.");
         }
-        kafkaProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        checkNotNull(recordSerializer, "recordSerializer");
+    }
+
+    /**
+     * Constructs the {@link KafkaSink} with the configured properties.
+     *
+     * @return {@link KafkaSink}
+     */
+    public KafkaSink<IN> build() {
+        sanityCheck();
         return new KafkaSink<>(
-                deliveryGuarantee,
-                kafkaProducerConfig,
-                transactionalIdPrefix,
-                checkNotNull(recordSerializer, "recordSerializer"));
+                deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, 
recordSerializer);
     }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
new file mode 100644
index 00000000000..1ad2cde2b22
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/** Tests for {@link KafkaSinkBuilder}. */
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaSinkBuilderTest {
+
+    @Test
+    public void testBootstrapServerSettingWithProperties() {
+        Properties testConf = new Properties();
+        testConf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "testServer");
+        KafkaSinkBuilder<String> builder =
+                new KafkaSinkBuilder<String>()
+                        .setKafkaProducerConfig(testConf)
+                        .setRecordSerializer(
+                                KafkaRecordSerializationSchema.builder()
+                                        .setTopic("topic")
+                                        .setValueSerializationSchema(new 
SimpleStringSchema())
+                                        .build());
+
+        assertDoesNotThrow(builder::build);
+    }
+}

Reply via email to