Repository: samza
Updated Branches:
  refs/heads/master 7dd356c50 -> 76196b63c


SAMZA-962 - Add Java friendly api to KafkaSystemConsumerMetrics.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/76196b63
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/76196b63
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/76196b63

Branch: refs/heads/master
Commit: 76196b63c0ccd8d30646a2a3db6444ec17726715
Parents: 7dd356c
Author: Boris Shkolnik <[email protected]>
Authored: Tue Jun 7 17:57:13 2016 -0700
Committer: Navina Ramesh <[email protected]>
Committed: Tue Jun 7 17:57:13 2016 -0700

----------------------------------------------------------------------
 .../kafka/KafkaSystemConsumerMetrics.scala      | 36 ++++++++++++++++++++
 1 file changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/76196b63/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index 741a176..befd729 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -60,7 +60,43 @@ class KafkaSystemConsumerMetrics(val systemName: String = 
"unknown", val registr
     brokerReads.put((host, port), newCounter("%s-%s-messages-read" format 
(host, port)))
     brokerSkippedFetchRequests.put((host, port), 
newCounter("%s-%s-skipped-fetch-requests" format (host, port)))
     topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format 
(host, port), 0))
+
+
   }
 
+  // java friendlier interfaces
+  // Gauges
+  def setTopicPartitionValue(host: String, port: Int, value: Int) {
+    topicPartitions.get((host,port)).set(value)
+  }
+  def setLagValue(topicAndPartition: TopicAndPartition, value: Long) {
+    lag.get((topicAndPartition)).set(value);
+  }
+  def setHighWatermarkValue(topicAndPartition: TopicAndPartition, value: Long) 
{
+    highWatermark.get((topicAndPartition)).set(value);
+  }
+
+  // Counters
+  def incBrokerReads(host: String, port: Int) {
+    brokerReads.get((host,port)).inc
+  }
+  def incReads(topicAndPartition: TopicAndPartition) {
+    reads.get(topicAndPartition).inc;
+  }
+  def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) {
+    reads.get(topicAndPartition).inc(inc);
+  }
+  def incBrokerBytesReads(host: String, port: Int, inc: Long) {
+    brokerReads.get((host,port)).inc(inc)
+  }
+  def incBrokerSkippedFetchRequests(host: String, port: Int) {
+    brokerSkippedFetchRequests.get((host,port)).inc()
+  }
+  def setOffsets(topicAndPartition: TopicAndPartition, offset: Long) {
+    offsets.get(topicAndPartition).set(offset)
+  }
+  def incReconnects(host: String, port: Int) {
+    reconnects.get((host,port)).inc()
+  }
   override def getPrefix = systemName + "-"
 }

Reply via email to