This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new a07e386  SAMZA-2324: Adding KV store metrics for rocksdb (#1158)
a07e386 is described below

commit a07e386d1fd0a0ff560a4a22918d27cfe28a6249
Author: bkonold <[email protected]>
AuthorDate: Wed Oct 9 14:12:27 2019 -0700

    SAMZA-2324: Adding KV store metrics for rocksdb (#1158)
    
    1) size of store on disk (sst files; property provided by rocksdb)
    2) maximum record size
---
 .../apache/samza/storage/kv/RocksDbKeyValueStore.scala   |  3 ++-
 .../samza/storage/kv/SerializedKeyValueStore.scala       | 16 ++++++++++++++--
 .../storage/kv/SerializedKeyValueStoreMetrics.scala      |  1 +
 3 files changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index a2ae8b0..f1ae004 100644
--- 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -83,7 +83,8 @@ object RocksDbKeyValueStore extends Logging {
         "rocksdb.cur-size-active-mem-table", // approximate active memtable 
size in bytes
         "rocksdb.cur-size-all-mem-tables", // approximate active and unflushed 
memtable size in bytes
         "rocksdb.size-all-mem-tables", // approximate active, unflushed and 
pinned memtable size in bytes
-        "rocksdb.estimate-num-keys" // approximate number keys in the active 
and unflushed memtable and storage
+        "rocksdb.estimate-num-keys", // approximate number keys in the active 
and unflushed memtable and storage
+        "rocksdb.total-sst-files-size" // size of all sst files on disk
       )
 
       val configuredMetrics = storeConfig
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 169452c..7136b60 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -57,23 +57,27 @@ class SerializedKeyValueStore[K, V](
   }
 
   def put(key: K, value: V) {
-    metrics.puts.inc
     val keyBytes = toBytesOrNull(key, keySerde)
     val valBytes = toBytesOrNull(value, msgSerde)
     store.put(keyBytes, valBytes)
+    val valSizeBytes = if (valBytes == null) 0 else valBytes.length
+    updatePutMetrics(1, valSizeBytes)
   }
 
   def putAll(entries: java.util.List[Entry[K, V]]) {
     val list = new java.util.ArrayList[Entry[Array[Byte], 
Array[Byte]]](entries.size())
     val iter = entries.iterator
+    var newMaxRecordSizeBytes = 0
     while (iter.hasNext) {
       val curr = iter.next
       val keyBytes = toBytesOrNull(curr.getKey, keySerde)
       val valBytes = toBytesOrNull(curr.getValue, msgSerde)
+      val valSizeBytes = if (valBytes == null) 0 else valBytes.length
+      newMaxRecordSizeBytes = Math.max(newMaxRecordSizeBytes, valSizeBytes)
       list.add(new Entry(keyBytes, valBytes))
     }
     store.putAll(list)
-    metrics.puts.inc(list.size)
+    updatePutMetrics(list.size, newMaxRecordSizeBytes)
   }
 
   def delete(key: K) {
@@ -151,6 +155,14 @@ class SerializedKeyValueStore[K, V](
     bytes
   }
 
+  private def updatePutMetrics(batchSize: Long, newMaxRecordSizeBytes: Long) = 
{
+    metrics.puts.inc(batchSize)
+    var max = metrics.maxRecordSizeBytes.getValue
+    while (newMaxRecordSizeBytes > max && 
!metrics.maxRecordSizeBytes.compareAndSet(max, newMaxRecordSizeBytes)) {
+      max = metrics.maxRecordSizeBytes.getValue
+    }
+  }
+
   override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
     val fromBytes = toBytesOrNull(from, keySerde)
     val toBytes = toBytesOrNull(to, keySerde)
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
index 841e4a2..f7dc953 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
@@ -35,6 +35,7 @@ class SerializedKeyValueStoreMetrics(
   val flushes = newCounter("flushes")
   val bytesSerialized = newCounter("bytes-serialized")
   val bytesDeserialized = newCounter("bytes-deserialized")
+  val maxRecordSizeBytes = newGauge("max-record-size-bytes", 0L)
 
   override def getPrefix = storeName + "-"
 }
\ No newline at end of file

Reply via email to