Repository: incubator-gobblin Updated Branches: refs/heads/master a7ca3d7bf -> 5c316d95c
[GOBBLIN-629] Skip updating statistics for Kafka partitions in KafkaExtractor when topic is skipped.[] Closes #2499 from sv2000/kafkaPartitionIndex Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5c316d95 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5c316d95 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5c316d95 Branch: refs/heads/master Commit: 5c316d95c46bc1eaea91310a249a5c56f4859480 Parents: a7ca3d7 Author: suvasude <[email protected]> Authored: Tue Nov 13 16:56:16 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Nov 13 16:56:16 2018 -0800 ---------------------------------------------------------------------- .../gobblin/source/extractor/extract/kafka/KafkaExtractor.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c316d95/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java index b0d38b2..fb22f9d 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java @@ -370,8 +370,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { @Override public void close() throws IOException { - - updateStatisticsForCurrentPartition(); + if (currentPartitionIdx != INITIAL_PARTITION_IDX) { + updateStatisticsForCurrentPartition(); + } Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = Maps.newHashMap();
