Repository: samza Updated Branches: refs/heads/master 11df9ff4f -> c3b469e0a
SAMZA-503: refresh lag gauge even when not fetching messages Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c3b469e0 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c3b469e0 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c3b469e0 Branch: refs/heads/master Commit: c3b469e0ad28995810e4e0067e0e4aa2f57c9619 Parents: 11df9ff Author: Yan Fang <[email protected]> Authored: Fri Jan 30 10:58:12 2015 -0800 Committer: Yan Fang <[email protected]> Committed: Fri Jan 30 10:58:12 2015 -0800 ---------------------------------------------------------------------- .../apache/samza/system/kafka/BrokerProxy.scala | 24 +++++++++++++++++++- .../kafka/KafkaSystemConsumerMetrics.scala | 2 ++ .../samza/system/kafka/TestBrokerProxy.scala | 24 +++++++++++++++++++- 3 files changed, 48 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c3b469e0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 9daf824..cc0a4c6 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -181,6 +181,9 @@ class BrokerProxy( nonErrorResponses.foreach { nonError => moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) } } else { + + refreshLatencyMetrics + debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions)) metrics.brokerSkippedFetchRequests(host, port).inc @@ -259,6 +262,7 @@ class BrokerProxy( // Update high water mark val hw = data.hw if (hw >= 0) { + metrics.highWatermark(tp).set(hw) metrics.lag(tp).set(hw - nextOffset) } else { debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp)) @@ -284,4 +288,22 @@ class BrokerProxy( thread.interrupt thread.join } -} + + private def refreshLatencyMetrics { + nextOffsets.foreach{ + case (topicAndPartition, offset) => { + val latestOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, -1, Request.OrdinaryConsumerId) + trace("latest offset of %s is %s" format (topicAndPartition, latestOffset)) + if (latestOffset >= 0) { + // only update the registered topicAndpartitions + if(metrics.highWatermark.containsKey(topicAndPartition)) { + metrics.highWatermark(topicAndPartition).set(latestOffset) + } + if(metrics.lag.containsKey(topicAndPartition)) { + metrics.lag(topicAndPartition).set(latestOffset - offset) + } + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c3b469e0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala index cf0dd22..741a176 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala @@ -32,6 +32,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr val bytesRead = new ConcurrentHashMap[TopicAndPartition, Counter] val reads = new ConcurrentHashMap[TopicAndPartition, Counter] val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]] + val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]] /* * (String, Int) = (host, port) of BrokerProxy. @@ -48,6 +49,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr offsets.put(tp, newCounter("%s-%s-offset-change" format (tp.topic, tp.partition))) bytesRead.put(tp, newCounter("%s-%s-bytes-read" format (tp.topic, tp.partition))) reads.put(tp, newCounter("%s-%s-messages-read" format (tp.topic, tp.partition))) + highWatermark.put(tp, newGauge("%s-%s-high-watermark" format (tp.topic, tp.partition), -1L)) lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format (tp.topic, tp.partition), 0L)) } } http://git-wip-us.apache.org/repos/asf/samza/blob/c3b469e0/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index 6d01071..6f05f3c 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -44,6 +44,7 @@ import scala.collection.JavaConversions._ class TestBrokerProxy extends Logging { val tp2 = new TopicAndPartition("Redbird", 2013) + var fetchTp1 = true // control whether fetching tp1 messages or not def getMockBrokerProxy() = { val sink = new MessageSink { @@ -56,7 +57,7 @@ class TestBrokerProxy extends Logging { } // Never need messages for tp2. - def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2) + def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2) && fetchTp1 } val system = "daSystem" @@ -142,6 +143,8 @@ class TestBrokerProxy extends Logging { sc.fetch(request) } + when(sc.earliestOrLatestOffset(any(classOf[TopicAndPartition]), any(classOf[Long]), any(classOf[Int]))).thenReturn(100) + override def getOffsetsBefore(request: OffsetRequest): OffsetResponse = sc.getOffsetsBefore(request) override def commitOffsets(request: OffsetCommitRequest): OffsetCommitResponse = sc.commitOffsets(request) @@ -196,6 +199,25 @@ class TestBrokerProxy extends Logging { } } + @Test def brokerProxyUpdateLatencyMetrics() = { + val (bp, tp, _) = getMockBrokerProxy() + + bp.start + bp.addTopicPartition(tp, Option("0")) + Thread.sleep(1000) + // update when fetching messages + assertEquals(500, bp.metrics.highWatermark(tp).getValue) + assertEquals(415, bp.metrics.lag(tp).getValue) + + fetchTp1 = false + Thread.sleep(1000) + // update when not fetching messages + assertEquals(100, bp.metrics.highWatermark(tp).getValue) + assertEquals(15, bp.metrics.lag(tp).getValue) + + fetchTp1 = true + } + @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange():Unit = { // Need to wait for the thread to do some work before ending the test val countdownLatch = new CountDownLatch(1)
