This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c88922  Fix negative values and blocking usage in Kafka lag 
monitoring. (#3434)
9c88922 is described below

commit 9c889222f9706f7a7b40e3ccbca78aa03f2d6d2c
Author: Markus Thömmes <markusthoem...@me.com>
AuthorDate: Wed Mar 14 13:05:54 2018 +0100

    Fix negative values and blocking usage in Kafka lag monitoring. (#3434)
    
    1. `endOffsets` might be eventually consistent to the locally stored offset 
value. Negative values need to be normalized to 0.
    2. `endOffsets` can "block indefinitly" per documentation, so we need to 
make sure to protect the execution context against thread starvation.
---
 .../connector/kafka/KafkaConsumerConnector.scala   | 25 +++++++++++++---------
 1 file changed, 15 insertions(+), 10 deletions(-)

diff --git 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index fc0954e..20d5635 100644
--- 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -25,14 +25,14 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{RetriableException, WakeupException}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import pureconfig.loadConfigOrThrow
-import whisk.common.{Logging, LoggingMarkers, MetricEmitter}
+import whisk.common.{Logging, LoggingMarkers, MetricEmitter, Scheduler}
 import whisk.core.ConfigKeys
 import whisk.core.connector.MessageConsumer
 
 import scala.collection.JavaConversions.{iterableAsScalaIterable, 
seqAsJavaList}
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future}
 import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.concurrent.{blocking, ExecutionContext, Future}
 
 case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: 
Int)
 
@@ -147,13 +147,18 @@ class KafkaConsumerConnector(
 
   @volatile private var consumer = getConsumer(getProps, Some(List(topic)))
 
-  //  Read current lag of the consumed topic, e.g. invoker queue
-  //  Since we use only one partition in kafka, it is defined 0
-  actorSystem.scheduler.schedule(10.second, cfg.metricFlushIntervalS.second) {
-    val topicAndPartition = Set(new TopicPartition(topic, 0))
-    consumer.endOffsets(topicAndPartition.asJava).asScala.find(_._1.topic() == 
topic).map(_._2).foreach { endOffset =>
-      val queueSize = endOffset - offset
-      MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), 
queueSize)
+  // Read current lag of the consumed topic, e.g. invoker queue
+  // Since we use only one partition in kafka, it is defined 0
+  Scheduler.scheduleWaitAtMost(cfg.metricFlushIntervalS.seconds, 10.seconds, 
"kafka-lag-monitor") { () =>
+    Future {
+      blocking {
+        val topicAndPartition = new TopicPartition(topic, 0)
+        
consumer.endOffsets(Set(topicAndPartition).asJava).asScala.get(topicAndPartition).foreach
 { endOffset =>
+          // endOffset could lag behind the offset reported by the consumer 
internally resulting in negative numbers
+          val queueSize = (endOffset - offset).max(0)
+          MetricEmitter.emitHistogramMetric(LoggingMarkers.KAFKA_QUEUE(topic), 
queueSize)
+        }
+      }
     }
   }
 }

-- 
To stop receiving notification emails like this one, please contact
cbic...@apache.org.

Reply via email to