Repository: samza Updated Branches: refs/heads/master ea2b6fa91 -> 7dc0290ad
SAMZA-1356: Improve monitoring for state restore Author: Jacob Maes <[email protected]> Author: Jacob Maes <[email protected]> Reviewers: Jagadish <[email protected]> Closes #241 from jmakes/samza-1356 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7dc0290a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7dc0290a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7dc0290a Branch: refs/heads/master Commit: 7dc0290ad8babe3abb315bac2aa4155350a7053d Parents: ea2b6fa Author: Jacob Maes <[email protected]> Authored: Tue Dec 19 08:00:56 2017 -0800 Committer: Jacob Maes <--global> Committed: Tue Dec 19 08:00:56 2017 -0800 ---------------------------------------------------------------------- .../samza/storage/kv/KeyValueStorageEngine.scala | 10 +++++++--- .../storage/kv/KeyValueStorageEngineMetrics.scala | 4 ++-- .../storage/kv/TestKeyValueStorageEngine.scala | 16 ++++++++++++++++ 3 files changed, 25 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7dc0290a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index 646b606..c42e043 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -99,6 +99,8 @@ class KeyValueStorageEngine[K, V]( * batching updates to underlying raw store to notAValidEvent wrapping functions for efficiency. */ def restore(envelopes: java.util.Iterator[IncomingMessageEnvelope]) { + info("Restoring entries for store " + metrics.storeName) + val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize) for (envelope <- envelopes.asScala) { @@ -113,11 +115,11 @@ class KeyValueStorageEngine[K, V]( } if (valBytes != null) { - metrics.restoredBytes.inc(valBytes.size) + metrics.restoredBytes.set(metrics.restoredBytes.getValue + valBytes.size) } - metrics.restoredBytes.inc(keyBytes.size) - metrics.restoredMessages.inc + metrics.restoredBytes.set(metrics.restoredBytes.getValue + keyBytes.size) + metrics.restoredMessages.set(metrics.restoredMessages.getValue + 1) count += 1 if (count % 1000000 == 0) { @@ -125,6 +127,8 @@ class KeyValueStorageEngine[K, V]( } } + info(count + " total entries restored.") + if (batch.size > 0) { rawStore.putAll(batch) } http://git-wip-us.apache.org/repos/asf/samza/blob/7dc0290a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala index 28cc891..f43b52e 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala @@ -34,8 +34,8 @@ class KeyValueStorageEngineMetrics( val deletes = newCounter("deletes") val flushes = newCounter("flushes") - val restoredMessages = newCounter("messages-restored") - val restoredBytes = newCounter("messages-bytes") + val restoredMessages = newGauge("messages-restored", 0) + val restoredBytes = newGauge("messages-bytes", 0) val getNs = newTimer("get-ns") val putNs = newTimer("put-ns") http://git-wip-us.apache.org/repos/asf/samza/blob/7dc0290a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala index f674685..ec714b2 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala @@ -21,7 +21,9 @@ package org.apache.samza.storage.kv import java.util.Arrays +import org.apache.samza.Partition import org.apache.samza.storage.StoreProperties +import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition} import org.junit.Assert._ import org.junit.{After, Before, Test} import org.mockito.Mockito._ @@ -129,6 +131,20 @@ class TestKeyValueStorageEngine { assertFalse("no next after iterating 2 keys in the range", iter.hasNext) } + @Test + def testRestoreMetrics(): Unit = { + val changelogSSP = new SystemStreamPartition("TestSystem", "TestStream", new Partition(0)) + val changelogEntries = java.util.Arrays asList( + new IncomingMessageEnvelope(changelogSSP, "0", Array[Byte](1, 2), Array[Byte](3, 4, 5)), + new IncomingMessageEnvelope(changelogSSP, "1", Array[Byte](2, 3), Array[Byte](4, 5, 6)), + new IncomingMessageEnvelope(changelogSSP, "2", Array[Byte](3, 4), Array[Byte](5, 6, 7))) + + engine.restore(changelogEntries.iterator()) + + assertEquals(3, metrics.restoredMessages.getValue) + assertEquals(15, metrics.restoredBytes.getValue) // 3 keys * 2 bytes/key + 3 msgs * 3 bytes/msg + } + def getNextTimestamp(): Long = { now += 1 now
