This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 92bc47e [GOBBLIN-966] Check if no partitions have been processed by
KafkaExtractor in close() method to avoid ArrayIndexOutOfBoundsException[]
92bc47e is described below
commit 92bc47ede10ffd68fee98a76e682608e0832ba39
Author: sv2000 <[email protected]>
AuthorDate: Fri Nov 15 14:05:48 2019 -0800
[GOBBLIN-966] Check if no partitions have been processed by KafkaExtractor
in close() method to avoid ArrayIndexOutOfBoundsException[]
Closes #2812 from sv2000/extractorStatsBug
---
.../apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 5ea38b7..11e17bf 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -303,7 +303,7 @@ public abstract class KafkaExtractor<S, D> extends
EventBasedExtractor<S, D> {
@Override
public void close() throws IOException {
- if (!allPartitionsFinished()) {
+ if (!allPartitionsFinished() && currentPartitionIdx !=
INITIAL_PARTITION_IDX) {
this.statsTracker.updateStatisticsForCurrentPartition(currentPartitionIdx,
readStartTime, getLastSuccessfulRecordHeaderTimestamp());
}
// Add error partition count and error message count to workUnitState