Repository: incubator-gobblin Updated Branches: refs/heads/master a6dfdc6d4 -> 65123a606
[GOBBLIN-643] Fix NPE when closing KafkaExtractor Closes #2513 from jack-moseley/npe-fix Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/65123a60 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/65123a60 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/65123a60 Branch: refs/heads/master Commit: 65123a60697b4df824fc361170feddef704b7202 Parents: a6dfdc6 Author: Jack Moseley <[email protected]> Authored: Tue Dec 4 09:46:40 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Dec 4 09:46:40 2018 -0800 ---------------------------------------------------------------------- .../source/extractor/extract/kafka/KafkaExtractor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/65123a60/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java index fb22f9d..e54a4b6 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java @@ -435,12 +435,12 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState, KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, partitionId))); - tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, Long.toString(this.startFetchEpochTime.get(partition))); - tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, Long.toString(this.stopFetchEpochTime.get(partition))); + tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, Long.toString(this.startFetchEpochTime.getOrDefault(partition, 0L))); + tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, Long.toString(this.stopFetchEpochTime.getOrDefault(partition, 0L))); this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, partitionId), - Long.toString(this.startFetchEpochTime.get(partition))); + Long.toString(this.startFetchEpochTime.getOrDefault(partition, 0L))); this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, partitionId), - Long.toString(this.stopFetchEpochTime.get(partition))); + Long.toString(this.stopFetchEpochTime.getOrDefault(partition, 0L))); if (this.processedRecordCount.containsKey(partition)) { tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(this.processedRecordCount.get(partition)));
