Repository: samza Updated Branches: refs/heads/master 7dd356c50 -> 76196b63c
SAMZA-962 - Add Java friendly api to KafkaSystemConsumerMetrics. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/76196b63 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/76196b63 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/76196b63 Branch: refs/heads/master Commit: 76196b63c0ccd8d30646a2a3db6444ec17726715 Parents: 7dd356c Author: Boris Shkolnik <[email protected]> Authored: Tue Jun 7 17:57:13 2016 -0700 Committer: Navina Ramesh <[email protected]> Committed: Tue Jun 7 17:57:13 2016 -0700 ---------------------------------------------------------------------- .../kafka/KafkaSystemConsumerMetrics.scala | 36 ++++++++++++++++++++ 1 file changed, 36 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/76196b63/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala index 741a176..befd729 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala @@ -60,7 +60,43 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr brokerReads.put((host, port), newCounter("%s-%s-messages-read" format (host, port))) brokerSkippedFetchRequests.put((host, port), newCounter("%s-%s-skipped-fetch-requests" format (host, port))) topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format (host, port), 0)) + + } + // java friendlier interfaces + // Gauges + def setTopicPartitionValue(host: String, port: Int, value: Int) { + topicPartitions.get((host,port)).set(value) + } + def setLagValue(topicAndPartition: TopicAndPartition, value: Long) { + lag.get((topicAndPartition)).set(value); + } + def setHighWatermarkValue(topicAndPartition: TopicAndPartition, value: Long) { + highWatermark.get((topicAndPartition)).set(value); + } + + // Counters + def incBrokerReads(host: String, port: Int) { + brokerReads.get((host,port)).inc + } + def incReads(topicAndPartition: TopicAndPartition) { + reads.get(topicAndPartition).inc; + } + def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) { + reads.get(topicAndPartition).inc(inc); + } + def incBrokerBytesReads(host: String, port: Int, inc: Long) { + brokerReads.get((host,port)).inc(inc) + } + def incBrokerSkippedFetchRequests(host: String, port: Int) { + brokerSkippedFetchRequests.get((host,port)).inc() + } + def setOffsets(topicAndPartition: TopicAndPartition, offset: Long) { + offsets.get(topicAndPartition).set(offset) + } + def incReconnects(host: String, port: Int) { + reconnects.get((host,port)).inc() + } override def getPrefix = systemName + "-" }
