This is an automated email from the ASF dual-hosted git repository.
cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 59ba42b Only send kafka offset metrics, if offsets are meaningful.
(#3780)
59ba42b is described below
commit 59ba42b577f9c8b7c7911da26ee9bae9d5e4d1a2
Author: Markus Thömmes <[email protected]>
AuthorDate: Tue Jun 19 11:56:39 2018 +0200
Only send kafka offset metrics, if offsets are meaningful. (#3780)
The internal `offset` held by the KafkaConsumerConnector starts at 0 but is
only meaningful after the first message has been read from Kafka. In case of an
Invoker restart for example, the reported offset difference might be absurdly
high, because the offset in Zookeeper is high, while the internal offset is
still 0.
---
.../scala/whisk/connector/kafka/KafkaConsumerConnector.scala | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index e551f5b..eeec4a4 100644
---
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -146,11 +146,13 @@ class KafkaConsumerConnector(
Scheduler.scheduleWaitAtMost(cfg.metricFlushIntervalS.seconds, 10.seconds,
"kafka-lag-monitor") { () =>
Future {
blocking {
- val topicAndPartition = new TopicPartition(topic, 0)
-
consumer.endOffsets(Set(topicAndPartition).asJava).asScala.get(topicAndPartition).foreach
{ endOffset =>
- // endOffset could lag behind the offset reported by the consumer
internally resulting in negative numbers
- val queueSize = (endOffset - offset).max(0)
- MetricEmitter.emitHistogramMetric(queueMetric, queueSize)
+ if (offset > 0) {
+ val topicAndPartition = new TopicPartition(topic, 0)
+
consumer.endOffsets(Set(topicAndPartition).asJava).asScala.get(topicAndPartition).foreach
{ endOffset =>
+ // endOffset could lag behind the offset reported by the consumer
internally resulting in negative numbers
+ val queueSize = (endOffset - offset).max(0)
+ MetricEmitter.emitHistogramMetric(queueMetric, queueSize)
+ }
}
}
}