Repository: incubator-gobblin
Updated Branches:
  refs/heads/master f957934a1 -> ba44dd304


[GOBBLIN-312] Pass extra kafka configuration to the KafkaConsumer in 
KafkaSimpleStreamingSource

Closes #2166 from htran1/kafka_consumer_config1


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ba44dd30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ba44dd30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ba44dd30

Branch: refs/heads/master
Commit: ba44dd30423d08f8ed9fbf27e2113055ac239ca9
Parents: f957934
Author: Hung Tran <[email protected]>
Authored: Thu Nov 9 21:07:08 2017 -0800
Committer: Hung Tran <[email protected]>
Committed: Thu Nov 9 21:07:08 2017 -0800

----------------------------------------------------------------------
 .../apache/gobblin/kafka/client/Kafka09ConsumerClient.java  | 7 ++++++-
 .../extractor/extract/kafka/KafkaSimpleStreamingSource.java | 9 ++++++++-
 .../kafka/client/AbstractBaseKafkaConsumerClient.java       | 5 +++--
 3 files changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba44dd30/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
index 4943ac5..7f83192 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
@@ -94,7 +94,12 @@ public class Kafka09ConsumerClient<K, V> extends 
AbstractBaseKafkaConsumerClient
     props.put(KAFKA_09_CLIENT_BOOTSTRAP_SERVERS_KEY, 
Joiner.on(",").join(super.brokers));
     props.put(KAFKA_09_CLIENT_SESSION_TIMEOUT_KEY, super.socketTimeoutMillis);
 
-    Config scopedConfig = 
config.getConfig(CONFIG_PREFIX_NO_DOT).withFallback(FALLBACK);
+    // grab all the config under "source.kafka" and add the defaults as 
fallback.
+    Config baseConfig = ConfigUtils.getConfigOrEmpty(config, 
CONFIG_NAMESPACE).withFallback(FALLBACK);
+    // get the "source.kafka.consumerConfig" config for extra config to pass 
along to Kafka
+    Config specificConfig = ConfigUtils.getConfigOrEmpty(baseConfig, 
CONSUMER_CONFIG);
+    // The specific config overrides settings in the base config
+    Config scopedConfig = 
specificConfig.withFallback(baseConfig.withoutPath(CONSUMER_CONFIG));
     props.putAll(ConfigUtils.configToProperties(scopedConfig));
 
     this.consumer = new KafkaConsumer<>(props);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba44dd30/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java
index 72e0778..60cdf91 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleStreamingSource.java
@@ -46,7 +46,6 @@ import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
 
-
 /**
  * A {@link EventBasedSource} implementation for a simple streaming kafka 
extractor.
  *
@@ -67,6 +66,8 @@ public class KafkaSimpleStreamingSource<S, D> extends 
EventBasedSource<S, Record
   public static final String TOPIC_KEY_DESERIALIZER = 
"gobblin.streaming.kafka.topic.key.deserializer";
   public static final String TOPIC_VALUE_DESERIALIZER = 
"gobblin.streaming.kafka.topic.value.deserializer";
 
+  public static final String KAFKA_CONSUMER_CONFIG_PREFIX = 
"gobblin.streaming.kafka.consumerConfig";
+
   /**
    * Private config keys used to pass data into work unit state
    */
@@ -103,6 +104,12 @@ public class KafkaSimpleStreamingSource<S, D> extends 
EventBasedSource<S, Record
     props.put("key.deserializer", config.getString(TOPIC_KEY_DESERIALIZER));
     Preconditions.checkArgument(config.hasPath(TOPIC_VALUE_DESERIALIZER));
     props.put("value.deserializer", 
config.getString(TOPIC_VALUE_DESERIALIZER));
+
+    // pass along any config scoped under source.kafka.config
+    // one use case of this is to pass SSL configuration
+    Config scopedConfig = ConfigUtils.getConfigOrEmpty(config, 
KAFKA_CONSUMER_CONFIG_PREFIX);
+    props.putAll(ConfigUtils.configToProperties(scopedConfig));
+
     Consumer consumer = null;
     try {
       consumer = new KafkaConsumer<>(props);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba44dd30/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
index 638a879..24c737f 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
@@ -41,8 +41,9 @@ import javax.annotation.Nullable;
  */
 public abstract class AbstractBaseKafkaConsumerClient implements 
GobblinKafkaConsumerClient {
 
-  public static final String CONFIG_PREFIX_NO_DOT = "source.kafka";
-  public static final String CONFIG_PREFIX = CONFIG_PREFIX_NO_DOT + ".";
+  public static final String CONFIG_NAMESPACE = "source.kafka";
+  public static final String CONFIG_PREFIX = CONFIG_NAMESPACE + ".";
+  public static final String CONSUMER_CONFIG = "consumerConfig";
   public static final String CONFIG_KAFKA_FETCH_TIMEOUT_VALUE = CONFIG_PREFIX 
+ "fetchTimeoutMillis";
   public static final int CONFIG_KAFKA_FETCH_TIMEOUT_VALUE_DEFAULT = 1000; // 
1 second
   public static final String CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES = 
CONFIG_PREFIX + "fetchMinBytes";

Reply via email to