This is an automated email from the ASF dual-hosted git repository. mbalassi pushed a commit to branch release-1.15.2.4-acs in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2258c4a668d18e2783a1da047ffe486f8659bac6 Author: Gyula Fora <[email protected]> AuthorDate: Fri Feb 25 12:33:03 2022 +0100 [backport][FLINK-26368] Add setProperty method to KafkaSinkBuilder --- .../flink/connector/kafka/sink/KafkaSink.java | 6 ++ .../connector/kafka/sink/KafkaSinkBuilder.java | 72 ++++++++++------------ .../connector/kafka/sink/KafkaSinkBuilderTest.java | 65 +++++++++++++++++++ .../kafka/table/KafkaDynamicTableFactoryTest.java | 6 -- 4 files changed, 105 insertions(+), 44 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java index 008554e77b9..6f74aaed5fa 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.StatefulSink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; @@ -129,4 +130,9 @@ public class KafkaSink<IN> public SimpleVersionedSerializer<KafkaWriterState> getWriterStateSerializer() { return new KafkaWriterStateSerializer(); } + + @VisibleForTesting + protected Properties getKafkaProducerConfig() { + return kafkaProducerConfig; + } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java index 47830505d1d..14e2b70385f 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Arrays; import java.util.Properties; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -61,16 +62,30 @@ public class KafkaSinkBuilder<IN> { private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkBuilder.class); private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Duration.ofHours(1); + private static final String[] warnKeys = + new String[] { + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG + }; private static final int MAXIMUM_PREFIX_BYTES = 64000; private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE; private String transactionalIdPrefix = "kafka-sink"; - private Properties kafkaProducerConfig; + private final Properties kafkaProducerConfig; private KafkaRecordSerializationSchema<IN> recordSerializer; private String bootstrapServers; - KafkaSinkBuilder() {} + KafkaSinkBuilder() { + kafkaProducerConfig = new Properties(); + kafkaProducerConfig.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProducerConfig.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProducerConfig.put( + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, + (int) DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMillis()); + } /** * Sets the wanted the {@link DeliveryGuarantee}. The default delivery guarantee is {@link @@ -88,43 +103,26 @@ public class KafkaSinkBuilder<IN> { * Sets the configuration which used to instantiate all used {@link * org.apache.kafka.clients.producer.KafkaProducer}. * - * @param kafkaProducerConfig + * @param props * @return {@link KafkaSinkBuilder} */ - public KafkaSinkBuilder<IN> setKafkaProducerConfig(Properties kafkaProducerConfig) { - this.kafkaProducerConfig = checkNotNull(kafkaProducerConfig, "kafkaProducerConfig"); - // set the producer configuration properties for kafka record key value serializers. - if (!kafkaProducerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { - kafkaProducerConfig.put( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - ByteArraySerializer.class.getName()); - } else { - LOG.warn( - "Overwriting the '{}' is not recommended", - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); - } + public KafkaSinkBuilder<IN> setKafkaProducerConfig(Properties props) { + checkNotNull(props); + Arrays.stream(warnKeys) + .filter(props::containsKey) + .forEach(k -> LOG.warn("Overwriting the '{}' is not recommended", k)); - if (!kafkaProducerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { - kafkaProducerConfig.put( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - ByteArraySerializer.class.getName()); - } else { - LOG.warn( - "Overwriting the '{}' is not recommended", - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); - } + kafkaProducerConfig.putAll(props); + return this; + } - if (!kafkaProducerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) { - final long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMillis(); - checkState( - timeout < Integer.MAX_VALUE && timeout > 0, - "timeout does not fit into 32 bit integer"); - kafkaProducerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout); - LOG.warn( - "Property [{}] not specified. Setting it to {}", - ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, - DEFAULT_KAFKA_TRANSACTION_TIMEOUT); - } + public KafkaSinkBuilder<IN> setProperty(String key, String value) { + checkNotNull(key); + Arrays.stream(warnKeys) + .filter(key::equals) + .forEach(k -> LOG.warn("Overwriting the '{}' is not recommended", k)); + + kafkaProducerConfig.setProperty(key, value); return this; } @@ -181,9 +179,6 @@ public class KafkaSinkBuilder<IN> { } private void sanityCheck() { - if (kafkaProducerConfig == null) { - setKafkaProducerConfig(new Properties()); - } if (bootstrapServers != null) { kafkaProducerConfig.setProperty( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); @@ -191,6 +186,7 @@ public class KafkaSinkBuilder<IN> { checkNotNull( kafkaProducerConfig.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), "bootstrapServers"); + if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { checkState( transactionalIdPrefix != null, diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java index f83909afc2b..062934711f5 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java @@ -24,14 +24,26 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import java.util.Arrays; import java.util.Properties; +import java.util.function.Consumer; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; /** Tests for {@link KafkaSinkBuilder}. */ @ExtendWith(TestLoggerExtension.class) public class KafkaSinkBuilderTest { + private static final String[] DEFAULT_KEYS = + new String[] { + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG + }; + @Test public void testBootstrapServerSettingWithProperties() { Properties testConf = new Properties(); @@ -47,4 +59,57 @@ public class KafkaSinkBuilderTest { assertDoesNotThrow(builder::build); } + + @Test + public void testPropertyHandling() { + validateProducerConfig( + getBasicBuilder(), + p -> { + Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, p.containsKey(k))); + }); + + validateProducerConfig( + getBasicBuilder().setProperty("k1", "v1"), + p -> { + Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, p.containsKey(k))); + p.containsKey("k1"); + }); + + Properties testConf = new Properties(); + testConf.put("k1", "v1"); + testConf.put("k2", "v2"); + + validateProducerConfig( + getBasicBuilder().setKafkaProducerConfig(testConf), + p -> { + Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, p.containsKey(k))); + testConf.forEach((k, v) -> assertEquals(v, p.get(k))); + }); + + validateProducerConfig( + getBasicBuilder() + .setProperty("k1", "incorrect") + .setKafkaProducerConfig(testConf) + .setProperty("k2", "correct"), + p -> { + Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, p.containsKey(k))); + assertEquals("v1", p.get("k1")); + assertEquals("correct", p.get("k2")); + }); + } + + private void validateProducerConfig( + KafkaSinkBuilder<?> builder, Consumer<Properties> validator) { + validator.accept(builder.build().getKafkaProducerConfig()); + } + + private KafkaSinkBuilder<String> getBasicBuilder() { + return new KafkaSinkBuilder<String>() + .setBootstrapServers("testServer") + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setTopic("topic") + .setValueSerializationSchema(new SimpleStringSchema()) + .build()); + } } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java index 1c81457fc7a..6673182ef7b 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java @@ -144,12 +144,6 @@ public class KafkaDynamicTableFactoryTest { KAFKA_SINK_PROPERTIES.setProperty("bootstrap.servers", "dummy"); KAFKA_FINAL_SINK_PROPERTIES.putAll(KAFKA_SINK_PROPERTIES); - KAFKA_FINAL_SINK_PROPERTIES.setProperty( - "value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - KAFKA_FINAL_SINK_PROPERTIES.setProperty( - "key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - KAFKA_FINAL_SINK_PROPERTIES.put("transaction.timeout.ms", 3600000); - KAFKA_FINAL_SOURCE_PROPERTIES.putAll(KAFKA_SOURCE_PROPERTIES); }
