Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a7ca3d7bf -> 5c316d95c


[GOBBLIN-629] Skip updating statistics for Kafka partitions in KafkaExtractor 
when topic is skipped.[]

Closes #2499 from sv2000/kafkaPartitionIndex


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5c316d95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5c316d95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5c316d95

Branch: refs/heads/master
Commit: 5c316d95c46bc1eaea91310a249a5c56f4859480
Parents: a7ca3d7
Author: suvasude <[email protected]>
Authored: Tue Nov 13 16:56:16 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Tue Nov 13 16:56:16 2018 -0800

----------------------------------------------------------------------
 .../gobblin/source/extractor/extract/kafka/KafkaExtractor.java  | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c316d95/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index b0d38b2..fb22f9d 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -370,8 +370,9 @@ public abstract class KafkaExtractor<S, D> extends 
EventBasedExtractor<S, D> {
 
   @Override
   public void close() throws IOException {
-
-    updateStatisticsForCurrentPartition();
+    if (currentPartitionIdx != INITIAL_PARTITION_IDX) {
+      updateStatisticsForCurrentPartition();
+    }
 
     Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = 
Maps.newHashMap();
 

Reply via email to