Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a6dfdc6d4 -> 65123a606


[GOBBLIN-643] Fix NPE when closing KafkaExtractor

Closes #2513 from jack-moseley/npe-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/65123a60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/65123a60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/65123a60

Branch: refs/heads/master
Commit: 65123a60697b4df824fc361170feddef704b7202
Parents: a6dfdc6
Author: Jack Moseley <[email protected]>
Authored: Tue Dec 4 09:46:40 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Tue Dec 4 09:46:40 2018 -0800

----------------------------------------------------------------------
 .../source/extractor/extract/kafka/KafkaExtractor.java       | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/65123a60/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index fb22f9d..e54a4b6 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -435,12 +435,12 @@ public abstract class KafkaExtractor<S, D> extends 
EventBasedExtractor<S, D> {
         
Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
             KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, partitionId)));
 
-    tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, 
Long.toString(this.startFetchEpochTime.get(partition)));
-    tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, 
Long.toString(this.stopFetchEpochTime.get(partition)));
+    tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, 
Long.toString(this.startFetchEpochTime.getOrDefault(partition, 0L)));
+    tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, 
Long.toString(this.stopFetchEpochTime.getOrDefault(partition, 0L)));
     
this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME,
 partitionId),
-        Long.toString(this.startFetchEpochTime.get(partition)));
+        Long.toString(this.startFetchEpochTime.getOrDefault(partition, 0L)));
     
this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME,
 partitionId),
-        Long.toString(this.stopFetchEpochTime.get(partition)));
+        Long.toString(this.stopFetchEpochTime.getOrDefault(partition, 0L)));
 
     if (this.processedRecordCount.containsKey(partition)) {
       tagsForPartition.put(PROCESSED_RECORD_COUNT, 
Long.toString(this.processedRecordCount.get(partition)));

Reply via email to