Repository: samza Updated Branches: refs/heads/master ae056ff7b -> 2227f9f1a
SAMZA-1736: Add counters and timers for batch get/put/delete operations in KeyValueStorageEngine Author: Prateek Maheshwari <[email protected]> Reviewers: Cameron Lee <[email protected]>, Shanthoosh Venkatraman <[email protected]> Closes #539 from prateekm/store-metrics Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2227f9f1 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2227f9f1 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2227f9f1 Branch: refs/heads/master Commit: 2227f9f1a56e0336e9ce64188e00620608fbd985 Parents: ae056ff Author: Prateek Maheshwari <[email protected]> Authored: Fri Jun 1 13:17:37 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Fri Jun 1 13:17:37 2018 -0700 ---------------------------------------------------------------------- .../samza/storage/kv/RocksDbKeyValueStore.scala | 7 ++-- .../storage/kv/KeyValueStorageEngine.scala | 39 +++++++++++++------- .../kv/KeyValueStorageEngineMetrics.scala | 23 +++++++----- .../samza/storage/kv/KeyValueStoreMetrics.scala | 5 ++- 4 files changed, 47 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/2227f9f1/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index 6f3794d..f25097c 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -154,11 +154,12 @@ class RocksDbKeyValueStore( } def put(key: Array[Byte], value: Array[Byte]): Unit = ifOpen { - metrics.puts.inc require(key != null, "Null key not allowed.") if (value == null) { + metrics.deletes.inc db.delete(writeOptions, key) } else { + metrics.puts.inc metrics.bytesWritten.inc(key.length + value.length) db.put(writeOptions, key, value) } @@ -166,16 +167,17 @@ class RocksDbKeyValueStore( // Write batch from RocksDB API is not used currently because of: https://github.com/facebook/rocksdb/issues/262 def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]): Unit = ifOpen { + metrics.putAlls.inc() val iter = entries.iterator var wrote = 0 var deletes = 0 while (iter.hasNext) { - wrote += 1 val curr = iter.next() if (curr.getValue == null) { deletes += 1 db.delete(writeOptions, curr.getKey) } else { + wrote += 1 val key = curr.getKey val value = curr.getValue metrics.bytesWritten.inc(key.length + value.length) @@ -187,7 +189,6 @@ class RocksDbKeyValueStore( } def delete(key: Array[Byte]): Unit = ifOpen { - metrics.deletes.inc put(key, null) } http://git-wip-us.apache.org/repos/asf/samza/blob/2227f9f1/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index 157c1bc..963dce4 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -20,7 +20,7 @@ package org.apache.samza.storage.kv import org.apache.samza.util.Logging -import org.apache.samza.storage.{StoreProperties, StorageEngine} +import org.apache.samza.storage.{StorageEngine, StoreProperties} import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.util.TimerUtil @@ -52,8 +52,11 @@ class KeyValueStorageEngine[K, V]( } override def getAll(keys: java.util.List[K]): java.util.Map[K, V] = { - metrics.gets.inc(keys.size) - wrapperStore.getAll(keys) + updateTimer(metrics.getAllNs) { + metrics.getAlls.inc() + metrics.gets.inc(keys.size) + wrapperStore.getAll(keys) + } } def put(key: K, value: V) = { @@ -64,8 +67,7 @@ class KeyValueStorageEngine[K, V]( } def putAll(entries: java.util.List[Entry[K, V]]) = { - metrics.puts.inc(entries.size) - wrapperStore.putAll(entries) + doPutAll(wrapperStore, entries) } def delete(key: K) = { @@ -76,8 +78,11 @@ class KeyValueStorageEngine[K, V]( } override def deleteAll(keys: java.util.List[K]) = { - metrics.deletes.inc(keys.size) - wrapperStore.deleteAll(keys) + updateTimer(metrics.deleteAllNs) { + metrics.deleteAlls.inc() + metrics.deletes.inc(keys.size) + wrapperStore.deleteAll(keys) + } } def range(from: K, to: K) = { @@ -110,17 +115,17 @@ class KeyValueStorageEngine[K, V]( batch.add(new Entry(keyBytes, valBytes)) if (batch.size >= batchSize) { - rawStore.putAll(batch) + doPutAll(rawStore, batch) batch.clear() } if (valBytes != null) { - metrics.restoredBytes.inc(valBytes.size) - metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + valBytes.size) + metrics.restoredBytes.inc(valBytes.length) + metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + valBytes.length) } - metrics.restoredBytes.inc(keyBytes.size) - metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + keyBytes.size) + metrics.restoredBytes.inc(keyBytes.length) + metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + keyBytes.length) metrics.restoredMessages.inc() metrics.restoredMessagesGauge.set(metrics.restoredMessagesGauge.getValue + 1) @@ -134,7 +139,7 @@ class KeyValueStorageEngine[K, V]( info(count + " total entries restored.") if (batch.size > 0) { - rawStore.putAll(batch) + doPutAll(rawStore, batch) } } @@ -159,6 +164,14 @@ class KeyValueStorageEngine[K, V]( wrapperStore.close() } + private def doPutAll[Key, Value](store: KeyValueStore[Key, Value], entries: java.util.List[Entry[Key, Value]]) = { + updateTimer(metrics.putAllNs) { + metrics.putAlls.inc() + metrics.puts.inc(entries.size) + store.putAll(entries) + } + } + override def getStoreProperties: StoreProperties = storeProperties override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = { http://git-wip-us.apache.org/repos/asf/samza/blob/2227f9f1/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala index a2c812e..92889ed 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala @@ -28,27 +28,32 @@ class KeyValueStorageEngineMetrics( val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper { val gets = newCounter("gets") - val ranges = newCounter("ranges") - val alls = newCounter("alls") + val getAlls = newCounter("get-alls") val puts = newCounter("puts") + val putAlls = newCounter("put-alls") val deletes = newCounter("deletes") + val deleteAlls = newCounter("delete-alls") val flushes = newCounter("flushes") + val alls = newCounter("alls") + val ranges = newCounter("ranges") val snapshots = newCounter("snapshots") - val restoredMessages = newCounter("messages-restored") //Deprecated - val restoredMessagesGauge = newGauge("restored-messages", 0) - - val restoredBytes = newCounter("messages-bytes") //Deprecated - val restoredBytesGauge = newGauge("restored-bytes", 0) - - val getNs = newTimer("get-ns") + val getAllNs = newTimer("get-all-ns") val putNs = newTimer("put-ns") + val putAllNs = newTimer("put-all-ns") val deleteNs = newTimer("delete-ns") + val deleteAllNs = newTimer("delete-all-ns") val flushNs = newTimer("flush-ns") val allNs = newTimer("all-ns") val rangeNs = newTimer("range-ns") val snapshotNs = newTimer("snapshot-ns") + val restoredMessages = newCounter("messages-restored") //Deprecated + val restoredMessagesGauge = newGauge("restored-messages", 0) + + val restoredBytes = newCounter("messages-bytes") //Deprecated + val restoredBytesGauge = newGauge("restored-bytes", 0) + override def getPrefix = storeName + "-" } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/2227f9f1/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala index 967d509..a73ad04 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala @@ -27,11 +27,12 @@ class KeyValueStoreMetrics( val gets = newCounter("gets") val getAlls = newCounter("getAlls") - val ranges = newCounter("ranges") - val alls = newCounter("alls") val puts = newCounter("puts") + val putAlls = newCounter("putAlls") val deletes = newCounter("deletes") val deleteAlls = newCounter("deleteAlls") + val alls = newCounter("alls") + val ranges = newCounter("ranges") val flushes = newCounter("flushes") val bytesWritten = newCounter("bytes-written") val bytesRead = newCounter("bytes-read")
