Repository: incubator-gobblin Updated Branches: refs/heads/master f957934a1 -> ba44dd304
[GOBBLIN-312] Pass extra kafka configuration to the KafkaConsumer in KafkaSimpleStreamingSource Closes #2166 from htran1/kafka_consumer_config1 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ba44dd30 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ba44dd30 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ba44dd30 Branch: refs/heads/master Commit: ba44dd30423d08f8ed9fbf27e2113055ac239ca9 Parents: f957934 Author: Hung Tran <[email protected]> Authored: Thu Nov 9 21:07:08 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Thu Nov 9 21:07:08 2017 -0800 ---------------------------------------------------------------------- .../apache/gobblin/kafka/client/Kafka09ConsumerClient.java | 7 ++++++- .../extractor/extract/kafka/KafkaSimpleStreamingSource.java | 9 ++++++++- .../kafka/client/AbstractBaseKafkaConsumerClient.java | 5 +++-- 3 files changed, 17 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba44dd30/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java index 4943ac5..7f83192 100644 --- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java @@ -94,7 +94,12 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient props.put(KAFKA_09_CLIENT_BOOTSTRAP_SERVERS_KEY, Joiner.on(",").join(super.brokers)); props.put(KAFKA_09_CLIENT_SESSION_TIMEOUT_KEY, super.socketTimeoutMillis); - Config scopedConfig = config.getConfig(CONFIG_PREFIX_NO_DOT).withFallback(FALLBACK); + // grab all the config under "source.kafka" and add the defaults as fallback. + Config baseConfig = ConfigUtils.getConfigOrEmpty(config, CONFIG_NAMESPACE).withFallback(FALLBACK); + // get the "source.kafka.consumerConfig" config for extra config to pass along to Kafka + Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig, CONSUMER_CONFIG); + // The specific config overrides settings in the base config + Config scopedConfig = specificConfig.withFallback(baseConfig.withoutPath(CONSUMER_CONFIG)); props.putAll(ConfigUtils.configToProperties(scopedConfig)); this.consumer = new KafkaConsumer<>(props); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba44dd30/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java index 72e0778..60cdf91 100644 --- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java @@ -46,7 +46,6 @@ import org.apache.gobblin.source.workunit.Extract; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.util.ConfigUtils; - /** * A {@link EventBasedSource} implementation for a simple streaming kafka extractor. * @@ -67,6 +66,8 @@ public class KafkaSimpleStreamingSource<S, D> extends EventBasedSource<S, Record public static final String TOPIC_KEY_DESERIALIZER = "gobblin.streaming.kafka.topic.key.deserializer"; public static final String TOPIC_VALUE_DESERIALIZER = "gobblin.streaming.kafka.topic.value.deserializer"; + public static final String KAFKA_CONSUMER_CONFIG_PREFIX = "gobblin.streaming.kafka.consumerConfig"; + /** * Private config keys used to pass data into work unit state */ @@ -103,6 +104,12 @@ public class KafkaSimpleStreamingSource<S, D> extends EventBasedSource<S, Record props.put("key.deserializer", config.getString(TOPIC_KEY_DESERIALIZER)); Preconditions.checkArgument(config.hasPath(TOPIC_VALUE_DESERIALIZER)); props.put("value.deserializer", config.getString(TOPIC_VALUE_DESERIALIZER)); + + // pass along any config scoped under source.kafka.config + // one use case of this is to pass SSL configuration + Config scopedConfig = ConfigUtils.getConfigOrEmpty(config, KAFKA_CONSUMER_CONFIG_PREFIX); + props.putAll(ConfigUtils.configToProperties(scopedConfig)); + Consumer consumer = null; try { consumer = new KafkaConsumer<>(props); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba44dd30/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java index 638a879..24c737f 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java @@ -41,8 +41,9 @@ import javax.annotation.Nullable; */ public abstract class AbstractBaseKafkaConsumerClient implements GobblinKafkaConsumerClient { - public static final String CONFIG_PREFIX_NO_DOT = "source.kafka"; - public static final String CONFIG_PREFIX = CONFIG_PREFIX_NO_DOT + "."; + public static final String CONFIG_NAMESPACE = "source.kafka"; + public static final String CONFIG_PREFIX = CONFIG_NAMESPACE + "."; + public static final String CONSUMER_CONFIG = "consumerConfig"; public static final String CONFIG_KAFKA_FETCH_TIMEOUT_VALUE = CONFIG_PREFIX + "fetchTimeoutMillis"; public static final int CONFIG_KAFKA_FETCH_TIMEOUT_VALUE_DEFAULT = 1000; // 1 second public static final String CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES = CONFIG_PREFIX + "fetchMinBytes";
