Updated Branches:
  refs/heads/master 819b9960a -> bb0abb670

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index a1b547d..d3c1ae8 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -29,56 +29,74 @@ import grizzled.slf4j.Logging
 /**
  * A key/value store decorator that adds a changelog for any changes made to 
the underlying store
  */
-class LoggedStore[K,V](val store: KeyValueStore[K, V],
-                       val systemStreamPartition: SystemStreamPartition,
-                       val collector: MessageCollector) extends 
KeyValueStore[K, V] with Logging {
+class LoggedStore[K, V](
+  val store: KeyValueStore[K, V],
+  val systemStreamPartition: SystemStreamPartition,
+  val collector: MessageCollector,
+  val metrics: LoggedStoreMetrics = new LoggedStoreMetrics) extends 
KeyValueStore[K, V] with Logging {
 
   val systemStream = systemStreamPartition.getSystemStream
   val partitionId = systemStreamPartition.getPartition.getPartitionId
 
   /* pass through methods */
-  def get(key: K) = store.get(key)
-  def range(from: K, to: K) = store.range(from, to)
-  def all() = store.all()
-  
+  def get(key: K) = {
+    metrics.gets.inc
+    store.get(key)
+  }
+
+  def range(from: K, to: K) = {
+    metrics.ranges.inc
+    store.range(from, to)
+  }
+
+  def all() = {
+    metrics.alls.inc
+    store.all()
+  }
+
   /**
    * Perform the local update and log it out to the changelog
    */
   def put(key: K, value: V) {
+    metrics.puts.inc
     store.put(key, value)
     collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, key, 
value))
   }
-  
+
   /**
    * Perform multiple local updates and log out all changes to the changelog
    */
-  def putAll(entries: java.util.List[Entry[K,V]]) {
+  def putAll(entries: java.util.List[Entry[K, V]]) {
+    metrics.puts.inc(entries.size)
     store.putAll(entries)
     val iter = entries.iterator
-    while(iter.hasNext) {
+    while (iter.hasNext) {
       val curr = iter.next
       collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, 
curr.getKey, curr.getValue))
     }
   }
-  
+
   /**
-   * Perform the local delete and log it out to the changelog 
+   * Perform the local delete and log it out to the changelog
    */
   def delete(key: K) {
+    metrics.deletes.inc
     store.delete(key)
     collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, key, 
null))
   }
-  
+
   def flush {
     trace("Flushing.")
 
+    metrics.flushes.inc
+
     store.flush
   }
-  
+
   def close {
     trace("Closing.")
 
     store.close
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
new file mode 100644
index 0000000..ea56e8c
--- /dev/null
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.MetricsHelper
+
+class LoggedStoreMetrics(
+  val storeName: String = "unknown",
+  val registry: MetricsRegistry = new MetricsRegistryMap) extends 
MetricsHelper {
+
+  val gets = newCounter("gets")
+  val ranges = newCounter("ranges")
+  val alls = newCounter("alls")
+  val puts = newCounter("puts")
+  val deletes = newCounter("deletes")
+  val flushes = newCounter("flushes")
+
+  override def getPrefix = storeName + "-"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 75fd414..7b4db82 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -29,47 +29,65 @@ import grizzled.slf4j.Logging
 class SerializedKeyValueStore[K, V](
   store: KeyValueStore[Array[Byte], Array[Byte]],
   keySerde: Serde[K],
-  msgSerde: Serde[V]) extends KeyValueStore[K, V] with Logging {
+  msgSerde: Serde[V],
+  metrics: SerializedKeyValueStoreMetrics = new 
SerializedKeyValueStoreMetrics) extends KeyValueStore[K, V] with Logging {
 
   def get(key: K): V = {
     val keyBytes = keySerde.toBytes(key)
     val found = store.get(keyBytes)
-    if (found == null)
+    metrics.gets.inc
+    metrics.bytesSerialized.inc(keyBytes.size)
+    if (found == null) {
       null.asInstanceOf[V]
-    else
+    } else {
+      metrics.bytesDeserialized.inc(found.size)
       msgSerde.fromBytes(found).asInstanceOf[V]
+    }
   }
 
   def put(key: K, value: V) {
+    metrics.puts.inc
     val keyBytes = keySerde.toBytes(key)
     val valBytes = msgSerde.toBytes(value)
+    metrics.bytesSerialized.inc(keyBytes.size + valBytes.size)
     store.put(keyBytes, valBytes)
   }
 
   def putAll(entries: java.util.List[Entry[K, V]]) {
     val list = new java.util.ArrayList[Entry[Array[Byte], 
Array[Byte]]](entries.size())
     val iter = entries.iterator
+    var bytesSerialized = 0L
     while (iter.hasNext) {
       val curr = iter.next
       val keyBytes = keySerde.toBytes(curr.getKey)
       val valBytes = msgSerde.toBytes(curr.getValue)
+      bytesSerialized += keyBytes.size + valBytes.size
       list.add(new Entry(keyBytes, valBytes))
     }
     store.putAll(list)
+    metrics.puts.inc(list.size)
+    metrics.bytesSerialized.inc(bytesSerialized)
   }
 
   def delete(key: K) {
-    store.delete(keySerde.toBytes(key))
+    metrics.deletes.inc
+    val keyBytes = keySerde.toBytes(key)
+    metrics.bytesSerialized.inc(keyBytes.size)
+    store.delete(keyBytes)
   }
 
   def range(from: K, to: K): KeyValueIterator[K, V] = {
+    metrics.ranges.inc
     val fromBytes = keySerde.toBytes(from)
     val toBytes = keySerde.toBytes(to)
+    metrics.bytesSerialized.inc(fromBytes.size + toBytes.size)
     new DeserializingIterator(store.range(fromBytes, toBytes))
   }
 
-  def all(): KeyValueIterator[K, V] =
+  def all(): KeyValueIterator[K, V] = {
+    metrics.alls.inc
     new DeserializingIterator(store.all)
+  }
 
   private class DeserializingIterator(iter: KeyValueIterator[Array[Byte], 
Array[Byte]]) extends KeyValueIterator[K, V] {
     def hasNext() = iter.hasNext()
@@ -77,13 +95,21 @@ class SerializedKeyValueStore[K, V](
     def close() = iter.close()
     def next(): Entry[K, V] = {
       val nxt = iter.next()
-      new Entry(keySerde.fromBytes(nxt.getKey()).asInstanceOf[K], 
msgSerde.fromBytes(nxt.getValue()).asInstanceOf[V])
+      val keyBytes = nxt.getKey
+      val valBytes = nxt.getValue
+      metrics.bytesDeserialized.inc(keyBytes.size)
+      if (valBytes != null) {
+        metrics.bytesDeserialized.inc(valBytes.size)
+      }
+      new Entry(keySerde.fromBytes(keyBytes).asInstanceOf[K], 
msgSerde.fromBytes(valBytes).asInstanceOf[V])
     }
   }
 
   def flush {
     trace("Flushing.")
 
+    metrics.flushes.inc
+
     store.flush
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
new file mode 100644
index 0000000..2ad21c8
--- /dev/null
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.MetricsHelper
+
+class SerializedKeyValueStoreMetrics(
+  val storeName: String = "unknown",
+  val registry: MetricsRegistry = new MetricsRegistryMap) extends 
MetricsHelper {
+
+  val gets = newCounter("gets")
+  val ranges = newCounter("ranges")
+  val alls = newCounter("alls")
+  val puts = newCounter("puts")
+  val deletes = newCounter("deletes")
+  val flushes = newCounter("flushes")
+  val bytesSerialized = newCounter("bytes-serialized")
+  val bytesDeserialized = newCounter("bytes-deserialized")
+
+  override def getPrefix = storeName + "-"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index 4f6edfb..983771d 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -34,10 +34,9 @@ import org.apache.samza.SamzaException
 import java.util.Timer
 import java.util.TimerTask
 import org.apache.samza.task.ReadableCollector
+import org.apache.samza.metrics.MetricsHelper
 
 object SamzaAppMasterMetrics {
-  val metricsGroup = "samza.yarn.am"
-
   val sourceName = "ApplicationMaster"
 }
 
@@ -46,11 +45,13 @@ object SamzaAppMasterMetrics {
  * registry, we might as well use it. This class takes Samza's application
  * master state, and converts it to metrics.
  */
-class SamzaAppMasterMetrics(config: Config, state: SamzaAppMasterState, 
registry: ReadableMetricsRegistry) extends YarnAppMasterListener with Logging {
-  import SamzaAppMasterMetrics._
+class SamzaAppMasterMetrics(
+  val config: Config,
+  val state: SamzaAppMasterState,
+  val registry: ReadableMetricsRegistry) extends MetricsHelper with 
YarnAppMasterListener with Logging {
 
   val jvm = new JvmMetrics(registry)
-  val mEventLoops = registry.newCounter(metricsGroup, "EventLoops")
+  val mEventLoops = newCounter("event-loops")
   val reporters = config.getMetricReporterNames.map(reporterName => {
     val metricsFactoryClassName = config
       .getMetricsFactoryClass(reporterName)
@@ -66,17 +67,18 @@ class SamzaAppMasterMetrics(config: Config, state: 
SamzaAppMasterState, registry
   }).toMap
 
   override def onInit() {
-    val mRunningContainers = registry.newGauge(metricsGroup, 
"RunningContainers", { state.runningTasks.size })
-    val mNeededContainers = registry.newGauge(metricsGroup, 
"NeededContainers", { state.neededContainers })
-    val mCompletedContainers = registry.newGauge(metricsGroup, 
"CompletedContainers", { state.completedTasks })
-    val mFailedContainers = registry.newGauge(metricsGroup, 
"FailedContainers", { state.failedContainers })
-    val mReleasedContainers = registry.newGauge(metricsGroup, 
"ReleasedContainers", { state.releasedContainers })
-    val mTasks = registry.newGauge(metricsGroup, "TaskCount", { 
state.taskCount })
-    val mHost = registry.newGauge(metricsGroup, "HttpHost", { state.nodeHost })
-    val mTrackingPort = registry.newGauge(metricsGroup, "HttpPort", { 
state.trackingPort })
-    val mRpcPort = registry.newGauge(metricsGroup, "RpcPort", { state.rpcPort 
})
-    val mAppAttemptId = registry.newGauge(metricsGroup, "AppAttemptId", { 
state.appAttemptId.toString })
+    val mRunningContainers = newGauge("running-containers", () => 
state.runningTasks.size)
+    val mNeededContainers = newGauge("needed-containers", () => 
state.neededContainers)
+    val mCompletedContainers = newGauge("completed-containers", () => 
state.completedTasks)
+    val mFailedContainers = newGauge("failed-containers", () => 
state.failedContainers)
+    val mReleasedContainers = newGauge("released-containers", () => 
state.releasedContainers)
+    val mTasks = newGauge("task-count", () => state.taskCount)
+    val mHost = newGauge("http-host", () => state.nodeHost)
+    val mTrackingPort = newGauge("http-port", () => state.trackingPort)
+    val mRpcPort = newGauge("rpc-port", () => state.rpcPort)
+    val mAppAttemptId = newGauge("app-attempt-id", () => 
state.appAttemptId.toString)
 
+    jvm.start
     reporters.values.foreach(_.start)
   }
 

Reply via email to