This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a5a31de5b30 [FLINK-26368] [kafka] Add setProperty method to
KafkaSinkBuilder
a5a31de5b30 is described below
commit a5a31de5b3068f7fbc756b44b1674f98f4c04dea
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Apr 4 13:49:35 2022 +0200
[FLINK-26368] [kafka] Add setProperty method to KafkaSinkBuilder
---
.../flink/connector/kafka/sink/KafkaSink.java | 6 ++
.../connector/kafka/sink/KafkaSinkBuilder.java | 71 ++++++++--------
.../connector/kafka/sink/KafkaSinkBuilderTest.java | 96 ++++++++++++++++++++++
.../kafka/table/KafkaDynamicTableFactoryTest.java | 6 --
4 files changed, 135 insertions(+), 44 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
index 008554e77b9..6f74aaed5fa 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
@@ -19,6 +19,7 @@ package org.apache.flink.connector.kafka.sink;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
@@ -129,4 +130,9 @@ public class KafkaSink<IN>
public SimpleVersionedSerializer<KafkaWriterState>
getWriterStateSerializer() {
return new KafkaWriterStateSerializer();
}
+
+ @VisibleForTesting
+ protected Properties getKafkaProducerConfig() {
+ return kafkaProducerConfig;
+ }
}
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
index e87d4dd1867..62e79547c23 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.util.Arrays;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -61,16 +62,30 @@ public class KafkaSinkBuilder<IN> {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaSinkBuilder.class);
private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT =
Duration.ofHours(1);
+ private static final String[] warnKeys =
+ new String[] {
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
+ };
private static final int MAXIMUM_PREFIX_BYTES = 64000;
private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE;
private String transactionalIdPrefix = "kafka-sink";
- private Properties kafkaProducerConfig;
+ private final Properties kafkaProducerConfig;
private KafkaRecordSerializationSchema<IN> recordSerializer;
private String bootstrapServers;
- KafkaSinkBuilder() {}
+ KafkaSinkBuilder() {
+ kafkaProducerConfig = new Properties();
+ kafkaProducerConfig.put(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ kafkaProducerConfig.put(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ kafkaProducerConfig.put(
+ ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
+ (int) DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMillis());
+ }
/**
* Sets the wanted the {@link DeliveryGuarantee}. The default delivery
guarantee is {@link
@@ -88,43 +103,26 @@ public class KafkaSinkBuilder<IN> {
* Sets the configuration which used to instantiate all used {@link
* org.apache.kafka.clients.producer.KafkaProducer}.
*
- * @param kafkaProducerConfig
+ * @param props
* @return {@link KafkaSinkBuilder}
*/
- public KafkaSinkBuilder<IN> setKafkaProducerConfig(Properties
kafkaProducerConfig) {
- this.kafkaProducerConfig = checkNotNull(kafkaProducerConfig,
"kafkaProducerConfig");
- // set the producer configuration properties for kafka record key
value serializers.
- if
(!kafkaProducerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
- kafkaProducerConfig.put(
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- ByteArraySerializer.class.getName());
- } else {
- LOG.warn(
- "Overwriting the '{}' is not recommended",
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
- }
+ public KafkaSinkBuilder<IN> setKafkaProducerConfig(Properties props) {
+ checkNotNull(props);
+ Arrays.stream(warnKeys)
+ .filter(props::containsKey)
+ .forEach(k -> LOG.warn("Overwriting the '{}' is not
recommended", k));
- if
(!kafkaProducerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
{
- kafkaProducerConfig.put(
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- ByteArraySerializer.class.getName());
- } else {
- LOG.warn(
- "Overwriting the '{}' is not recommended",
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
- }
+ kafkaProducerConfig.putAll(props);
+ return this;
+ }
- if
(!kafkaProducerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
- final long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMillis();
- checkState(
- timeout < Integer.MAX_VALUE && timeout > 0,
- "timeout does not fit into 32 bit integer");
- kafkaProducerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
(int) timeout);
- LOG.warn(
- "Property [{}] not specified. Setting it to {}",
- ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
- DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
- }
+ public KafkaSinkBuilder<IN> setProperty(String key, String value) {
+ checkNotNull(key);
+ Arrays.stream(warnKeys)
+ .filter(key::equals)
+ .forEach(k -> LOG.warn("Overwriting the '{}' is not
recommended", k));
+
+ kafkaProducerConfig.setProperty(key, value);
return this;
}
@@ -186,9 +184,6 @@ public class KafkaSinkBuilder<IN> {
* @return {@link KafkaSink}
*/
public KafkaSink<IN> build() {
- if (kafkaProducerConfig == null) {
- setKafkaProducerConfig(new Properties());
- }
checkNotNull(bootstrapServers);
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
checkState(
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
new file mode 100644
index 00000000000..2f1f1acac83
--- /dev/null
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link KafkaSinkBuilder}. */
+public class KafkaSinkBuilderTest extends TestLogger {
+
+ private static final String[] DEFAULT_KEYS =
+ new String[] {
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ ProducerConfig.TRANSACTION_TIMEOUT_CONFIG
+ };
+
+ @Test
+ public void testPropertyHandling() {
+ validateProducerConfig(
+ getBasicBuilder(),
+ p -> {
+ Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k,
p.containsKey(k)));
+ });
+
+ validateProducerConfig(
+ getBasicBuilder().setProperty("k1", "v1"),
+ p -> {
+ Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k,
p.containsKey(k)));
+ p.containsKey("k1");
+ });
+
+ Properties testConf = new Properties();
+ testConf.put("k1", "v1");
+ testConf.put("k2", "v2");
+
+ validateProducerConfig(
+ getBasicBuilder().setKafkaProducerConfig(testConf),
+ p -> {
+ Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k,
p.containsKey(k)));
+ testConf.forEach((k, v) -> assertEquals(v, p.get(k)));
+ });
+
+ validateProducerConfig(
+ getBasicBuilder()
+ .setProperty("k1", "incorrect")
+ .setKafkaProducerConfig(testConf)
+ .setProperty("k2", "correct"),
+ p -> {
+ Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k,
p.containsKey(k)));
+ assertEquals("v1", p.get("k1"));
+ assertEquals("correct", p.get("k2"));
+ });
+ }
+
+ private void validateProducerConfig(
+ KafkaSinkBuilder<?> builder, Consumer<Properties> validator) {
+ validator.accept(builder.build().getKafkaProducerConfig());
+ }
+
+ private KafkaSinkBuilder<String> getBasicBuilder() {
+ return new KafkaSinkBuilder<String>()
+ .setBootstrapServers("testServer")
+ .setRecordSerializer(
+ KafkaRecordSerializationSchema.builder()
+ .setTopic("topic")
+ .setValueSerializationSchema(new
SimpleStringSchema())
+ .build());
+ }
+}
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 2c2e2fe1fde..4b0a863b5fe 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -144,12 +144,6 @@ public class KafkaDynamicTableFactoryTest {
KAFKA_SINK_PROPERTIES.setProperty("bootstrap.servers", "dummy");
KAFKA_FINAL_SINK_PROPERTIES.putAll(KAFKA_SINK_PROPERTIES);
- KAFKA_FINAL_SINK_PROPERTIES.setProperty(
- "value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
- KAFKA_FINAL_SINK_PROPERTIES.setProperty(
- "key.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
- KAFKA_FINAL_SINK_PROPERTIES.put("transaction.timeout.ms", 3600000);
-
KAFKA_FINAL_SOURCE_PROPERTIES.putAll(KAFKA_SOURCE_PROPERTIES);
}