[GOBBLIN-403] Fix null pointer issue due to kafkajobmonitor metrics is not initialized in the constructor
Closes #2277 from yukuai518/fix Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/19b2d81b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/19b2d81b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/19b2d81b Branch: refs/heads/0.12.0 Commit: 19b2d81b9539207beaca70c2efb36f258160fa27 Parents: 34de6bf Author: Kuai Yu <[email protected]> Authored: Tue Feb 6 14:16:03 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Feb 6 14:16:03 2018 -0800 ---------------------------------------------------------------------- .../service/StreamingKafkaSpecConsumer.java | 25 +++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/19b2d81b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java index 4764603..5fd5413 100644 --- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java +++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java @@ -203,6 +203,8 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S private ContextAwareGauge<Long> jobSpecEnq; private ContextAwareGauge<Long> jobSpecDeq; private ContextAwareGauge<Long> jobSpecConsumed; + private ContextAwareGauge<Long> jobSpecParseFailures; + private AtomicLong jobSpecEnqCount = new AtomicLong(0); private AtomicLong jobSpecDeqCount = new AtomicLong(0); @@ -210,14 +212,30 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S public static final String SPEC_CONSUMER_JOB_SPEC_ENQ = "specConsumerJobSpecEnq"; public static final String SPEC_CONSUMER_JOB_SPEC_DEQ = "specConsumerJobSpecDeq"; public static final String SPEC_CONSUMER_JOB_SPEC_CONSUMED = "specConsumerJobSpecConsumed"; - + public static final String SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES = "specConsumerJobSpecParseFailures"; public Metrics(MetricContext context) { this.jobSpecQueueSize = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, ()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size()); this.jobSpecEnq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, ()->jobSpecEnqCount.get()); this.jobSpecDeq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, ()->jobSpecDeqCount.get()); this.jobSpecConsumed = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED, - ()->StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs().getCount() + StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs().getCount()); + ()->getNewSpecs() + getRemovedSpecs() + getMessageParseFailures()); + this.jobSpecParseFailures = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES, ()->getMessageParseFailures()); + } + + private long getNewSpecs() { + return StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs() != null? + StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs().getCount() : 0; + } + + private long getRemovedSpecs() { + return StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs() != null? + StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs().getCount() : 0; + } + + private long getMessageParseFailures() { + return StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures() != null? + StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures().getCount():0; } public Collection<ContextAwareGauge<?>> getGauges() { @@ -226,13 +244,14 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S list.add(jobSpecEnq); list.add(jobSpecDeq); list.add(jobSpecConsumed); + list.add(jobSpecParseFailures); return list; } } @Override public StandardMetrics getStandardMetrics() { - throw new UnsupportedOperationException("Implemented in sub class"); + return this._metrics; } @Nonnull
