This is an automated email from the ASF dual-hosted git repository. cbickel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 9c88922 Fix negative values and blocking usage in Kafka lag monitoring. (#3434) 9c88922 is described below commit 9c889222f9706f7a7b40e3ccbca78aa03f2d6d2c Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Wed Mar 14 13:05:54 2018 +0100 Fix negative values and blocking usage in Kafka lag monitoring. (#3434) 1. `endOffsets` might be eventually consistent to the locally stored offset value. Negative values need to be normalized to 0. 2. `endOffsets` can "block indefinitly" per documentation, so we need to make sure to protect the execution context against thread starvation. --- .../connector/kafka/KafkaConsumerConnector.scala | 25 +++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala index fc0954e..20d5635 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala @@ -25,14 +25,14 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{RetriableException, WakeupException} import org.apache.kafka.common.serialization.ByteArrayDeserializer import pureconfig.loadConfigOrThrow -import whisk.common.{Logging, LoggingMarkers, MetricEmitter} +import whisk.common.{Logging, LoggingMarkers, MetricEmitter, Scheduler} import whisk.core.ConfigKeys import whisk.core.connector.MessageConsumer import scala.collection.JavaConversions.{iterableAsScalaIterable, seqAsJavaList} -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future} import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import scala.concurrent.{blocking, ExecutionContext, Future} case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: Int) @@ -147,13 +147,18 @@ class KafkaConsumerConnector( @volatile private var consumer = getConsumer(getProps, Some(List(topic))) - // Read current lag of the consumed topic, e.g. invoker queue - // Since we use only one partition in kafka, it is defined 0 - actorSystem.scheduler.schedule(10.second, cfg.metricFlushIntervalS.second) { - val topicAndPartition = Set(new TopicPartition(topic, 0)) - consumer.endOffsets(topicAndPartition.asJava).asScala.find(_._1.topic() == topic).map(_._2).foreach { endOffset => - val queueSize = endOffset - offset - MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize) + // Read current lag of the consumed topic, e.g. invoker queue + // Since we use only one partition in kafka, it is defined 0 + Scheduler.scheduleWaitAtMost(cfg.metricFlushIntervalS.seconds, 10.seconds, "kafka-lag-monitor") { () => + Future { + blocking { + val topicAndPartition = new TopicPartition(topic, 0) + consumer.endOffsets(Set(topicAndPartition).asJava).asScala.get(topicAndPartition).foreach { endOffset => + // endOffset could lag behind the offset reported by the consumer internally resulting in negative numbers + val queueSize = (endOffset - offset).max(0) + MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), queueSize) + } + } } } } -- To stop receiving notification emails like this one, please contact cbic...@apache.org.