Repository: incubator-gobblin
Updated Branches:
  refs/heads/master acb90d71a -> bac980328


[GOBBLIN-571] Add optional state property to KafkaTopic

Closes #2462 from jack-moseley/kafkatopic-state


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/bac98032
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/bac98032
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/bac98032

Branch: refs/heads/master
Commit: bac980328bd4bd5725e35288cc8c79c3e2450e6b
Parents: acb90d7
Author: Jack Moseley <[email protected]>
Authored: Sun Sep 30 21:20:58 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Sun Sep 30 21:20:58 2018 -0700

----------------------------------------------------------------------
 .../source/extractor/extract/kafka/KafkaSource.java     |  7 +++++++
 .../source/extractor/extract/kafka/KafkaTopic.java      | 12 ++++++++++++
 2 files changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bac98032/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index e7d7da0..1fb1acc 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -221,6 +221,13 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
             }
           }), state);
 
+      for (KafkaTopic topic : topics) {
+        if (topic.getTopicSpecificState().isPresent()) {
+          topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new 
State())
+              .addAllIfNotExist(topic.getTopicSpecificState().get());
+        }
+      }
+
       int numOfThreads = 
state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
           
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
       ExecutorService threadPool =

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bac98032/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java
index d8de232..ffafb54 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java
@@ -20,8 +20,11 @@ package org.apache.gobblin.source.extractor.extract.kafka;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 
+import org.apache.gobblin.configuration.State;
+
 
 /**
  * A kafka topic is composed of a topic name, and a list of partitions.
@@ -32,13 +35,19 @@ import com.google.common.collect.Lists;
 public final class KafkaTopic {
   private final String name;
   private final List<KafkaPartition> partitions;
+  private Optional<State> topicSpecificState;
 
   public KafkaTopic(String name, List<KafkaPartition> partitions) {
+    this(name, partitions, Optional.absent());
+  }
+
+  public KafkaTopic(String name, List<KafkaPartition> partitions, 
Optional<State> topicSpecificState) {
     this.name = name;
     this.partitions = Lists.newArrayList();
     for (KafkaPartition partition : partitions) {
       this.partitions.add(new KafkaPartition(partition));
     }
+    this.topicSpecificState = topicSpecificState;
   }
 
   public String getName() {
@@ -49,4 +58,7 @@ public final class KafkaTopic {
     return Collections.unmodifiableList(this.partitions);
   }
 
+  public Optional<State> getTopicSpecificState() {
+    return this.topicSpecificState;
+  }
 }

Reply via email to