Repository: samza
Updated Branches:
  refs/heads/master 11df9ff4f -> c3b469e0a


SAMZA-503: refresh lag gauge even when not fetching messages


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c3b469e0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c3b469e0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c3b469e0

Branch: refs/heads/master
Commit: c3b469e0ad28995810e4e0067e0e4aa2f57c9619
Parents: 11df9ff
Author: Yan Fang <[email protected]>
Authored: Fri Jan 30 10:58:12 2015 -0800
Committer: Yan Fang <[email protected]>
Committed: Fri Jan 30 10:58:12 2015 -0800

----------------------------------------------------------------------
 .../apache/samza/system/kafka/BrokerProxy.scala | 24 +++++++++++++++++++-
 .../kafka/KafkaSystemConsumerMetrics.scala      |  2 ++
 .../samza/system/kafka/TestBrokerProxy.scala    | 24 +++++++++++++++++++-
 3 files changed, 48 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c3b469e0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 9daf824..cc0a4c6 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -181,6 +181,9 @@ class BrokerProxy(
 
       nonErrorResponses.foreach { nonError => 
moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) }
     } else {
+
+      refreshLatencyMetrics
+
       debug("No topic/partitions need to be fetched for %s:%s right now. 
Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions))
 
       metrics.brokerSkippedFetchRequests(host, port).inc
@@ -259,6 +262,7 @@ class BrokerProxy(
     // Update high water mark
     val hw = data.hw
     if (hw >= 0) {
+      metrics.highWatermark(tp).set(hw)
       metrics.lag(tp).set(hw - nextOffset)
     } else {
       debug("Got a high water mark less than 0 (%d) for %s, so skipping." 
format (hw, tp))
@@ -284,4 +288,22 @@ class BrokerProxy(
     thread.interrupt
     thread.join
   }
-}
+
+  private def refreshLatencyMetrics {
+    nextOffsets.foreach{
+      case (topicAndPartition, offset) => {
+        val latestOffset = 
simpleConsumer.earliestOrLatestOffset(topicAndPartition, -1, 
Request.OrdinaryConsumerId)
+        trace("latest offset of %s is %s" format (topicAndPartition, 
latestOffset))
+        if (latestOffset >= 0) {
+          // only update the registered topicAndpartitions
+          if(metrics.highWatermark.containsKey(topicAndPartition)) {
+            metrics.highWatermark(topicAndPartition).set(latestOffset)
+          }
+          if(metrics.lag.containsKey(topicAndPartition)) {
+            metrics.lag(topicAndPartition).set(latestOffset - offset)
+          }
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b469e0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index cf0dd22..741a176 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -32,6 +32,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = 
"unknown", val registr
   val bytesRead = new ConcurrentHashMap[TopicAndPartition, Counter]
   val reads = new ConcurrentHashMap[TopicAndPartition, Counter]
   val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
+  val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
 
   /*
    * (String, Int) = (host, port) of BrokerProxy.
@@ -48,6 +49,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = 
"unknown", val registr
       offsets.put(tp, newCounter("%s-%s-offset-change" format (tp.topic, 
tp.partition)))
       bytesRead.put(tp, newCounter("%s-%s-bytes-read" format (tp.topic, 
tp.partition)))
       reads.put(tp, newCounter("%s-%s-messages-read" format (tp.topic, 
tp.partition)))
+      highWatermark.put(tp, newGauge("%s-%s-high-watermark" format (tp.topic, 
tp.partition), -1L))
       lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format 
(tp.topic, tp.partition), 0L))
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b469e0/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 6d01071..6f05f3c 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -44,6 +44,7 @@ import scala.collection.JavaConversions._
 
 class TestBrokerProxy extends Logging {
   val tp2 = new TopicAndPartition("Redbird", 2013)
+  var fetchTp1 = true // control whether fetching tp1 messages or not
 
   def getMockBrokerProxy() = {
     val sink = new MessageSink {
@@ -56,7 +57,7 @@ class TestBrokerProxy extends Logging {
       }
 
       // Never need messages for tp2.
-      def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2)
+      def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2) 
&& fetchTp1
     }
 
     val system = "daSystem"
@@ -142,6 +143,8 @@ class TestBrokerProxy extends Logging {
             sc.fetch(request)
           }
 
+          when(sc.earliestOrLatestOffset(any(classOf[TopicAndPartition]), 
any(classOf[Long]), any(classOf[Int]))).thenReturn(100)
+
           override def getOffsetsBefore(request: OffsetRequest): 
OffsetResponse = sc.getOffsetsBefore(request)
 
           override def commitOffsets(request: OffsetCommitRequest): 
OffsetCommitResponse = sc.commitOffsets(request)
@@ -196,6 +199,25 @@ class TestBrokerProxy extends Logging {
     }
   }
 
+  @Test def brokerProxyUpdateLatencyMetrics() = {
+    val (bp, tp, _) = getMockBrokerProxy()
+
+    bp.start
+    bp.addTopicPartition(tp, Option("0"))
+    Thread.sleep(1000)
+    // update when fetching messages
+    assertEquals(500, bp.metrics.highWatermark(tp).getValue)
+    assertEquals(415, bp.metrics.lag(tp).getValue)
+
+    fetchTp1 = false
+    Thread.sleep(1000)
+    // update when not fetching messages
+    assertEquals(100, bp.metrics.highWatermark(tp).getValue)
+    assertEquals(15, bp.metrics.lag(tp).getValue)
+
+    fetchTp1 = true
+  }
+
   @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange():Unit = {
     // Need to wait for the thread to do some work before ending the test
     val countdownLatch = new CountDownLatch(1)

Reply via email to