Repository: incubator-gobblin Updated Branches: refs/heads/master 4a8f7ba8d -> b7b2bd9d1
[GOBBLIN-331] Add sharedConfig support for the KafkaDataWriters Closes #2183 from htran1/producer_shared_config Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b7b2bd9d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b7b2bd9d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b7b2bd9d Branch: refs/heads/master Commit: b7b2bd9d1be3e7452d375c30b09125dc020c9457 Parents: 4a8f7ba Author: Hung Tran <[email protected]> Authored: Mon Dec 4 14:28:46 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Mon Dec 4 14:28:46 2017 -0800 ---------------------------------------------------------------------- .../writer/KafkaWriterConfigurationKeys.java | 3 +- .../gobblin/kafka/writer/KafkaWriterHelper.java | 10 +++- .../kafka/writer/KafkaWriterHelperTest.java | 49 ++++++++++++++++++++ 3 files changed, 60 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7b2bd9d/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java index 279812e..84255d4 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java @@ -38,7 +38,8 @@ public class KafkaWriterConfigurationKeys { * Kafka producer configurations will be passed through as is as long as they are prefixed * by the PREFIX specified below. */ - public static final String KAFKA_PRODUCER_CONFIG_PREFIX = "writer.kafka.producerConfig."; + public static final String KAFKA_PRODUCER_CONFIG_PREFIX_NO_DOT = "writer.kafka.producerConfig"; + public static final String KAFKA_PRODUCER_CONFIG_PREFIX = KAFKA_PRODUCER_CONFIG_PREFIX_NO_DOT + "."; /** Kafka producer scoped configuration keys go here **/ static final String KEY_SERIALIZER_CONFIG = "key.serializer"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7b2bd9d/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java index 28da311..1999480 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java @@ -28,6 +28,7 @@ import com.typesafe.config.ConfigFactory; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.util.ConfigUtils; import static org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys.*; @@ -41,7 +42,14 @@ import static org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys.CLIEN public class KafkaWriterHelper { static Properties getProducerProperties(Properties props) { - Properties producerProperties = stripPrefix(props, KAFKA_PRODUCER_CONFIG_PREFIX); + Config config = ConfigUtils.propertiesToConfig(props); + + // get the "writer.kafka.producerConfig" config for producer config to pass along to Kafka with a fallback to the + // shared config that start with "gobblin.kafka.sharedConfig" + Config producerConfig = ConfigUtils.getConfigOrEmpty(config, KAFKA_PRODUCER_CONFIG_PREFIX_NO_DOT).withFallback( + ConfigUtils.getConfigOrEmpty(config, ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX)); + + Properties producerProperties = ConfigUtils.configToProperties(producerConfig); // Provide default properties if not set from above setDefaultIfUnset(producerProperties, KEY_SERIALIZER_CONFIG, DEFAULT_KEY_SERIALIZER); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7b2bd9d/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/KafkaWriterHelperTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/KafkaWriterHelperTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/KafkaWriterHelperTest.java new file mode 100644 index 0000000..1a134c1 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/kafka/writer/KafkaWriterHelperTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.kafka.writer; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.Properties; + +import org.apache.gobblin.configuration.ConfigurationKeys; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class KafkaWriterHelperTest { + + @Test + public void testSharedConfig() { + Properties props = new Properties(); + + props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "key1", "value1"); + props.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "key2", "value2"); + + props.setProperty(ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX + ".key1", "sharedValue1"); + props.setProperty(ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX + ".key3", "sharedValue3"); + + Properties producerProps = KafkaWriterHelper.getProducerProperties(props); + + // specific config overrides shared config + Assert.assertEquals(producerProps.getProperty("key1"), "value1"); + Assert.assertEquals(producerProps.getProperty("key2"), "value2"); + Assert.assertEquals(producerProps.getProperty("key3"), "sharedValue3"); + } +}
