[GOBBLIN-403] Fix null pointer issue due to kafkajobmonitor metrics is not 
initialized in the constructor

Closes #2277 from yukuai518/fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/19b2d81b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/19b2d81b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/19b2d81b

Branch: refs/heads/0.12.0
Commit: 19b2d81b9539207beaca70c2efb36f258160fa27
Parents: 34de6bf
Author: Kuai Yu <[email protected]>
Authored: Tue Feb 6 14:16:03 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Tue Feb 6 14:16:03 2018 -0800

----------------------------------------------------------------------
 .../service/StreamingKafkaSpecConsumer.java     | 25 +++++++++++++++++---
 1 file changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/19b2d81b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
index 4764603..5fd5413 100644
--- 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++ 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -203,6 +203,8 @@ public class StreamingKafkaSpecConsumer extends 
AbstractIdleService implements S
     private ContextAwareGauge<Long> jobSpecEnq;
     private ContextAwareGauge<Long> jobSpecDeq;
     private ContextAwareGauge<Long> jobSpecConsumed;
+    private ContextAwareGauge<Long> jobSpecParseFailures;
+
     private AtomicLong jobSpecEnqCount = new AtomicLong(0);
     private AtomicLong jobSpecDeqCount = new AtomicLong(0);
 
@@ -210,14 +212,30 @@ public class StreamingKafkaSpecConsumer extends 
AbstractIdleService implements S
     public static final String SPEC_CONSUMER_JOB_SPEC_ENQ = 
"specConsumerJobSpecEnq";
     public static final String SPEC_CONSUMER_JOB_SPEC_DEQ = 
"specConsumerJobSpecDeq";
     public static final String SPEC_CONSUMER_JOB_SPEC_CONSUMED = 
"specConsumerJobSpecConsumed";
-
+    public static final String SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES = 
"specConsumerJobSpecParseFailures";
 
     public Metrics(MetricContext context) {
       this.jobSpecQueueSize = 
context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, 
()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size());
       this.jobSpecEnq = 
context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, 
()->jobSpecEnqCount.get());
       this.jobSpecDeq = 
context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, 
()->jobSpecDeqCount.get());
       this.jobSpecConsumed = 
context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED,
-          
()->StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs().getCount() + 
StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs().getCount());
+          ()->getNewSpecs() + getRemovedSpecs() + getMessageParseFailures());
+      this.jobSpecParseFailures = 
context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES, 
()->getMessageParseFailures());
+    }
+
+    private long getNewSpecs() {
+      return StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs() != null?
+          StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs().getCount() 
: 0;
+    }
+
+    private long getRemovedSpecs() {
+      return StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs() != 
null?
+          
StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs().getCount() : 0;
+    }
+
+    private long getMessageParseFailures() {
+      return 
StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures() != null?
+          
StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures().getCount():0;
     }
 
     public Collection<ContextAwareGauge<?>> getGauges() {
@@ -226,13 +244,14 @@ public class StreamingKafkaSpecConsumer extends 
AbstractIdleService implements S
       list.add(jobSpecEnq);
       list.add(jobSpecDeq);
       list.add(jobSpecConsumed);
+      list.add(jobSpecParseFailures);
       return list;
     }
   }
 
   @Override
   public StandardMetrics getStandardMetrics() {
-    throw new UnsupportedOperationException("Implemented in sub class");
+    return this._metrics;
   }
 
   @Nonnull

Reply via email to