Repository: samza
Updated Branches:
  refs/heads/master ea2b6fa91 -> 7dc0290ad


SAMZA-1356: Improve monitoring for state restore

Author: Jacob Maes <[email protected]>
Author: Jacob Maes <[email protected]>

Reviewers: Jagadish <[email protected]>

Closes #241 from jmakes/samza-1356


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7dc0290a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7dc0290a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7dc0290a

Branch: refs/heads/master
Commit: 7dc0290ad8babe3abb315bac2aa4155350a7053d
Parents: ea2b6fa
Author: Jacob Maes <[email protected]>
Authored: Tue Dec 19 08:00:56 2017 -0800
Committer: Jacob Maes <--global>
Committed: Tue Dec 19 08:00:56 2017 -0800

----------------------------------------------------------------------
 .../samza/storage/kv/KeyValueStorageEngine.scala    | 10 +++++++---
 .../storage/kv/KeyValueStorageEngineMetrics.scala   |  4 ++--
 .../storage/kv/TestKeyValueStorageEngine.scala      | 16 ++++++++++++++++
 3 files changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7dc0290a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 646b606..c42e043 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -99,6 +99,8 @@ class KeyValueStorageEngine[K, V](
    * batching updates to underlying raw store to notAValidEvent wrapping 
functions for efficiency.
    */
   def restore(envelopes: java.util.Iterator[IncomingMessageEnvelope]) {
+    info("Restoring entries for store " + metrics.storeName)
+
     val batch = new java.util.ArrayList[Entry[Array[Byte], 
Array[Byte]]](batchSize)
 
     for (envelope <- envelopes.asScala) {
@@ -113,11 +115,11 @@ class KeyValueStorageEngine[K, V](
       }
 
       if (valBytes != null) {
-        metrics.restoredBytes.inc(valBytes.size)
+        metrics.restoredBytes.set(metrics.restoredBytes.getValue + 
valBytes.size)
       }
 
-      metrics.restoredBytes.inc(keyBytes.size)
-      metrics.restoredMessages.inc
+      metrics.restoredBytes.set(metrics.restoredBytes.getValue + keyBytes.size)
+      metrics.restoredMessages.set(metrics.restoredMessages.getValue + 1)
       count += 1
 
       if (count % 1000000 == 0) {
@@ -125,6 +127,8 @@ class KeyValueStorageEngine[K, V](
       }
     }
 
+    info(count + " total entries restored.")
+
     if (batch.size > 0) {
       rawStore.putAll(batch)
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/7dc0290a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
index 28cc891..f43b52e 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -34,8 +34,8 @@ class KeyValueStorageEngineMetrics(
   val deletes = newCounter("deletes")
   val flushes = newCounter("flushes")
 
-  val restoredMessages = newCounter("messages-restored")
-  val restoredBytes = newCounter("messages-bytes")
+  val restoredMessages = newGauge("messages-restored", 0)
+  val restoredBytes = newGauge("messages-bytes", 0)
 
   val getNs = newTimer("get-ns")
   val putNs = newTimer("put-ns")

http://git-wip-us.apache.org/repos/asf/samza/blob/7dc0290a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
 
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
index f674685..ec714b2 100644
--- 
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
+++ 
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
@@ -21,7 +21,9 @@ package org.apache.samza.storage.kv
 
 import java.util.Arrays
 
+import org.apache.samza.Partition
 import org.apache.samza.storage.StoreProperties
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.mockito.Mockito._
@@ -129,6 +131,20 @@ class TestKeyValueStorageEngine {
     assertFalse("no next after iterating 2 keys in the range", iter.hasNext)
   }
 
+  @Test
+  def testRestoreMetrics(): Unit = {
+    val changelogSSP = new SystemStreamPartition("TestSystem", "TestStream", 
new Partition(0))
+    val changelogEntries = java.util.Arrays asList(
+      new IncomingMessageEnvelope(changelogSSP, "0", Array[Byte](1, 2), 
Array[Byte](3, 4, 5)),
+      new IncomingMessageEnvelope(changelogSSP, "1", Array[Byte](2, 3), 
Array[Byte](4, 5, 6)),
+      new IncomingMessageEnvelope(changelogSSP, "2", Array[Byte](3, 4), 
Array[Byte](5, 6, 7)))
+
+    engine.restore(changelogEntries.iterator())
+
+    assertEquals(3, metrics.restoredMessages.getValue)
+    assertEquals(15, metrics.restoredBytes.getValue) // 3 keys * 2 bytes/key + 
 3 msgs * 3 bytes/msg
+  }
+  
   def getNextTimestamp(): Long = {
     now += 1
     now

Reply via email to