Repository: incubator-gobblin Updated Branches: refs/heads/master acb90d71a -> bac980328
[GOBBLIN-571] Add optional state property to KafkaTopic Closes #2462 from jack-moseley/kafkatopic-state Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/bac98032 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/bac98032 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/bac98032 Branch: refs/heads/master Commit: bac980328bd4bd5725e35288cc8c79c3e2450e6b Parents: acb90d7 Author: Jack Moseley <[email protected]> Authored: Sun Sep 30 21:20:58 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Sun Sep 30 21:20:58 2018 -0700 ---------------------------------------------------------------------- .../source/extractor/extract/kafka/KafkaSource.java | 7 +++++++ .../source/extractor/extract/kafka/KafkaTopic.java | 12 ++++++++++++ 2 files changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bac98032/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index e7d7da0..1fb1acc 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -221,6 +221,13 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { } }), state); + for (KafkaTopic topic : topics) { + if (topic.getTopicSpecificState().isPresent()) { + topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State()) + .addAllIfNotExist(topic.getTopicSpecificState().get()); + } + } + int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS, ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT); ExecutorService threadPool = http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bac98032/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java index d8de232..ffafb54 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java @@ -20,8 +20,11 @@ package org.apache.gobblin.source.extractor.extract.kafka; import java.util.Collections; import java.util.List; +import com.google.common.base.Optional; import com.google.common.collect.Lists; +import org.apache.gobblin.configuration.State; + /** * A kafka topic is composed of a topic name, and a list of partitions. @@ -32,13 +35,19 @@ import com.google.common.collect.Lists; public final class KafkaTopic { private final String name; private final List<KafkaPartition> partitions; + private Optional<State> topicSpecificState; public KafkaTopic(String name, List<KafkaPartition> partitions) { + this(name, partitions, Optional.absent()); + } + + public KafkaTopic(String name, List<KafkaPartition> partitions, Optional<State> topicSpecificState) { this.name = name; this.partitions = Lists.newArrayList(); for (KafkaPartition partition : partitions) { this.partitions.add(new KafkaPartition(partition)); } + this.topicSpecificState = topicSpecificState; } public String getName() { @@ -49,4 +58,7 @@ public final class KafkaTopic { return Collections.unmodifiableList(this.partitions); } + public Optional<State> getTopicSpecificState() { + return this.topicSpecificState; + } }
