This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 59ba42b  Only send kafka offset metrics, if offsets are meaningful. 
(#3780)
59ba42b is described below

commit 59ba42b577f9c8b7c7911da26ee9bae9d5e4d1a2
Author: Markus Thömmes <[email protected]>
AuthorDate: Tue Jun 19 11:56:39 2018 +0200

    Only send kafka offset metrics, if offsets are meaningful. (#3780)
    
    The internal `offset` held by the KafkaConsumerConnector starts at 0 but is 
only meaningful after the first message has been read from Kafka. In case of an 
Invoker restart for example, the reported offset difference might be absurdly 
high, because the offset in Zookeeper is high, while the internal offset is 
still 0.
---
 .../scala/whisk/connector/kafka/KafkaConsumerConnector.scala | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index e551f5b..eeec4a4 100644
--- 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -146,11 +146,13 @@ class KafkaConsumerConnector(
   Scheduler.scheduleWaitAtMost(cfg.metricFlushIntervalS.seconds, 10.seconds, 
"kafka-lag-monitor") { () =>
     Future {
       blocking {
-        val topicAndPartition = new TopicPartition(topic, 0)
-        
consumer.endOffsets(Set(topicAndPartition).asJava).asScala.get(topicAndPartition).foreach
 { endOffset =>
-          // endOffset could lag behind the offset reported by the consumer 
internally resulting in negative numbers
-          val queueSize = (endOffset - offset).max(0)
-          MetricEmitter.emitHistogramMetric(queueMetric, queueSize)
+        if (offset > 0) {
+          val topicAndPartition = new TopicPartition(topic, 0)
+          
consumer.endOffsets(Set(topicAndPartition).asJava).asScala.get(topicAndPartition).foreach
 { endOffset =>
+            // endOffset could lag behind the offset reported by the consumer 
internally resulting in negative numbers
+            val queueSize = (endOffset - offset).max(0)
+            MetricEmitter.emitHistogramMetric(queueMetric, queueSize)
+          }
         }
       }
     }

Reply via email to