Repository: samza Updated Branches: refs/heads/master 7c783f15c -> 870eb1dd6
SAMZA-963: adding latency metrics for KV storage engine Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/870eb1dd Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/870eb1dd Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/870eb1dd Branch: refs/heads/master Commit: 870eb1dd614c77d2626884a6b4d0e4479a254c4a Parents: 7c783f1 Author: Fred Ji <[email protected]> Authored: Mon Aug 8 13:27:54 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Tue Aug 9 11:31:29 2016 -0700 ---------------------------------------------------------------------- .../storage/kv/KeyValueStorageEngine.scala | 46 ++++--- .../kv/KeyValueStorageEngineMetrics.scala | 7 + .../storage/kv/TestKeyValueStorageEngine.scala | 130 +++++++++++++++++++ 3 files changed, 167 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/870eb1dd/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index a3ffc42..4141cbf 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -22,6 +22,7 @@ package org.apache.samza.storage.kv import org.apache.samza.util.Logging import org.apache.samza.storage.{StoreProperties, StorageEngine} import org.apache.samza.system.IncomingMessageEnvelope +import org.apache.samza.util.TimerUtils import scala.collection.JavaConversions._ @@ -35,14 +36,19 @@ class KeyValueStorageEngine[K, V]( wrapperStore: KeyValueStore[K, V], rawStore: KeyValueStore[Array[Byte], Array[Byte]], metrics: KeyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics, - batchSize: Int = 500) extends StorageEngine with KeyValueStore[K, V] with Logging { + batchSize: Int = 500, + val clock: () => Long = { System.nanoTime }) extends StorageEngine with KeyValueStore[K, V] with TimerUtils with Logging { var count = 0 /* delegate to underlying store */ def get(key: K): V = { - metrics.gets.inc - wrapperStore.get(key) + updateTimer(metrics.getNs) { + metrics.gets.inc + + //update the duration and return the fetched value + wrapperStore.get(key) + } } def getAll(keys: java.util.List[K]): java.util.Map[K, V] = { @@ -51,8 +57,10 @@ class KeyValueStorageEngine[K, V]( } def put(key: K, value: V) = { - metrics.puts.inc - wrapperStore.put(key, value) + updateTimer(metrics.putNs) { + metrics.puts.inc + wrapperStore.put(key, value) + } } def putAll(entries: java.util.List[Entry[K, V]]) = { @@ -61,8 +69,10 @@ class KeyValueStorageEngine[K, V]( } def delete(key: K) = { - metrics.deletes.inc - wrapperStore.delete(key) + updateTimer(metrics.deleteNs) { + metrics.deletes.inc + wrapperStore.delete(key) + } } def deleteAll(keys: java.util.List[K]) = { @@ -71,13 +81,17 @@ class KeyValueStorageEngine[K, V]( } def range(from: K, to: K) = { - metrics.ranges.inc - wrapperStore.range(from, to) + updateTimer(metrics.rangeNs) { + metrics.ranges.inc + wrapperStore.range(from, to) + } } def all() = { - metrics.alls.inc - wrapperStore.all() + updateTimer(metrics.allNs) { + metrics.alls.inc + wrapperStore.all() + } } /** @@ -117,11 +131,11 @@ class KeyValueStorageEngine[K, V]( } def flush() = { - trace("Flushing.") - - metrics.flushes.inc - - wrapperStore.flush() + updateTimer(metrics.flushNs) { + trace("Flushing.") + metrics.flushes.inc + wrapperStore.flush() + } } def stop() = { http://git-wip-us.apache.org/repos/asf/samza/blob/870eb1dd/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala index 233fba9..28cc891 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala @@ -37,5 +37,12 @@ class KeyValueStorageEngineMetrics( val restoredMessages = newCounter("messages-restored") val restoredBytes = newCounter("messages-bytes") + val getNs = newTimer("get-ns") + val putNs = newTimer("put-ns") + val deleteNs = newTimer("delete-ns") + val flushNs = newTimer("flush-ns") + val allNs = newTimer("all-ns") + val rangeNs = newTimer("range-ns") + override def getPrefix = storeName + "-" } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/870eb1dd/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala new file mode 100644 index 0000000..8276c18 --- /dev/null +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv + +import java.util.Arrays + +import org.apache.samza.storage.StoreProperties +import org.junit.Assert._ +import org.junit.{After, Before, Test} +import org.mockito.Mockito._ + +class TestKeyValueStorageEngine { + var engine: KeyValueStorageEngine[String, String] = null + var metrics: KeyValueStorageEngineMetrics = null; + + @Before + def setup() { + val wrapperKv = new MockKeyValueStore() + val rawKv = mock(classOf[KeyValueStore[Array[Byte], Array[Byte]]]) + val properties = mock(classOf[StoreProperties]) + metrics = new KeyValueStorageEngineMetrics + engine = new KeyValueStorageEngine[String, String](properties, wrapperKv, rawKv, metrics) + } + + @After + def teardown() { + engine.close() + } + + @Test + def testGetAndPut(): Unit = { + var prevGets = metrics.gets.getCount + var prevGetNsSnapshotSize = metrics.getNs.getSnapshot.getSize + var valueForK1 = engine.get("k1"); + assertNull("k1 is not existing before put", valueForK1); + assertEquals("get counter increments by 1", 1, metrics.gets.getCount - prevGets) + assertEquals("get timer has 1 additional data point" , 1, metrics.getNs.getSnapshot.getSize - prevGetNsSnapshotSize) + + var prevPuts = metrics.puts.getCount + var prevPutNsSnapshotSize = metrics.putNs.getSnapshot.getSize + engine.put("k1", "v1") + assertEquals("put counter increments by 1", 1, metrics.puts.getCount - prevPuts) + assertEquals("put timer has 1 additional data point", 1, metrics.putNs.getSnapshot.getSize - prevPutNsSnapshotSize) + + assertEquals("k1 is existing after put and the value for k1 is v1", "v1", engine.get("k1")) + } + + @Test + def testDelete(): Unit = { + engine.put("k1", "v1") + engine.put("k2", "v2") + assertNotNull("k1 is existing before being deleted", engine.get("k1")) + var prevDeletes = metrics.deletes.getCount + var prevDeleteNsSnapshotSize = metrics.deleteNs.getSnapshot.getSize + engine.delete("k1") + assertNull("k1 is not existing since it has been deleted", engine.get("k1")) + assertEquals("k2 is still existing after deleting k1", "v2", engine.get("k2")) + assertEquals("delete counter increments by 1", 1, metrics.deletes.getCount - prevDeletes) + assertEquals("delete timer has 1 additional data point", 1, metrics.deleteNs.getSnapshot.getSize - prevDeleteNsSnapshotSize) + } + + @Test + def testFlush(): Unit = { + var prevFlushes = metrics.flushes.getCount + var prevFlushNsSnapshotSize = metrics.flushNs.getSnapshot.getSize + engine.flush() + assertEquals("flush counter increments by 1", 1, metrics.flushes.getCount - prevFlushes) + assertEquals("flush timer has 1 additional data point", 1, metrics.flushNs.getSnapshot.getSize - prevFlushNsSnapshotSize) + } + + + @Test + def testIterator() { + val keys = Arrays.asList("k1", + "k2", + "k3") + val values = Arrays.asList("v1", + "v2", + "v3") + + for (i <- 0 until 3) { + engine.put(keys.get(i), values.get(i)) + } + + // test all iterator + var prevAlls = metrics.alls.getCount + var prevAllNsSnapshotSize = metrics.allNs.getSnapshot.getSize + var iter = engine.all() + assertEquals("all counter increments by 1", 1, metrics.alls.getCount - prevAlls) + assertEquals("all timer has 1 additional data point", 1, metrics.allNs.getSnapshot.getSize - prevAllNsSnapshotSize) + for (i <- 0 until 3) { + assertTrue("iterator has next for 3 times", iter.hasNext) + val entry = iter.next() + assertEquals(entry.getKey, keys.get(i)) + assertEquals(entry.getValue, values.get(i)) + } + assertFalse("no next after iterating all 3 keys", iter.hasNext) + + // test range iterator + var prevRanges = metrics.ranges.getCount + var prevRangeNsSnapshotSize = metrics.rangeNs.getSnapshot.getSize + iter = engine.range(keys.get(0), keys.get(2)) + assertEquals("range counter increments by 1", 1, metrics.ranges.getCount - prevRanges) + assertEquals("range timer has 1 additional data point", 1, metrics.rangeNs.getSnapshot.getSize - prevRangeNsSnapshotSize) + for (i <- 0 until 2) { //only iterate the first 2 keys in the range + assertTrue("iterator has next for twice", iter.hasNext) + val entry = iter.next() + assertEquals(entry.getKey, keys.get(i)) + assertEquals(entry.getValue, values.get(i)) + } + assertFalse("no next after iterating 2 keys in the range", iter.hasNext) + } +}
