Repository: incubator-gobblin Updated Branches: refs/heads/master c5e83a331 -> 07c86f2a7
[GOBBLIN-285] Kafka extractor computes avgMillisPerRecord even when a partition pull is interrupted Closes #2138 from ibuenros/avgpulltime-fix Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/07c86f2a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/07c86f2a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/07c86f2a Branch: refs/heads/master Commit: 07c86f2a7401c4a81d7104eb668dd8826b609520 Parents: c5e83a3 Author: ibuenros <[email protected]> Authored: Thu Oct 12 15:40:40 2017 -0700 Committer: Issac Buenrostro <[email protected]> Committed: Thu Oct 12 15:40:40 2017 -0700 ---------------------------------------------------------------------- .../extractor/extract/kafka/KafkaExtractor.java | 26 ++++++++++++-------- 1 file changed, 16 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/07c86f2a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java index ac1a7f2..1ff0159 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java @@ -235,19 +235,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { LOG.info("Pulling topic " + this.topicName); this.currentPartitionIdx = 0; } else { - this.stopwatch.stop(); - if (this.currentPartitionRecordCount != 0) { - double avgMillisForCurrentPartition = - (double) this.stopwatch.elapsed(TimeUnit.MILLISECONDS) / (double) this.currentPartitionRecordCount; - this.avgMillisPerRecord.put(this.getCurrentPartition(), avgMillisForCurrentPartition); - - long avgRecordSize = this.currentPartitionTotalSize / this.currentPartitionRecordCount; - this.avgRecordSizes.put(this.getCurrentPartition(), avgRecordSize); - } + computeAvgMillisPerRecordForCurrentPartition(); this.currentPartitionIdx++; this.currentPartitionRecordCount = 0; this.currentPartitionTotalSize = 0; - this.stopwatch.reset(); } this.messageIterator = null; @@ -260,6 +251,19 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { this.stopwatch.start(); } + private void computeAvgMillisPerRecordForCurrentPartition() { + this.stopwatch.stop(); + if (this.currentPartitionRecordCount != 0) { + double avgMillisForCurrentPartition = + (double) this.stopwatch.elapsed(TimeUnit.MILLISECONDS) / (double) this.currentPartitionRecordCount; + this.avgMillisPerRecord.put(this.getCurrentPartition(), avgMillisForCurrentPartition); + + long avgRecordSize = this.currentPartitionTotalSize / this.currentPartitionRecordCount; + this.avgRecordSizes.put(this.getCurrentPartition(), avgRecordSize); + } + this.stopwatch.reset(); + } + private void switchMetricContextToCurrentPartition() { if (this.currentPartitionIdx >= this.partitions.size()) { return; @@ -313,6 +317,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { @Override public void close() throws IOException { + computeAvgMillisPerRecordForCurrentPartition(); + Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = Maps.newHashMap(); // Add error partition count and error message count to workUnitState
