Repository: samza Updated Branches: refs/heads/master c93dd8f60 -> 343712e30
SAMZA-1461; Expose RocksDB properties as metrics Automatically build gauges for RocksDB properties via configuration: `stores.<storename>.rocksdb.telemetry.list=<rocksDbProperty1>, <rocksDbProperty1>` Author: Janek Lasocki-Biczysko <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #327 from janeklb/jlb_rocksDBPropertiesAsMetrics Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/343712e3 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/343712e3 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/343712e3 Branch: refs/heads/master Commit: 343712e30c0a7201d6bb47f340187af51f395671 Parents: c93dd8f Author: Janek Lasocki-Biczysko <[email protected]> Authored: Mon Oct 16 16:06:28 2017 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Mon Oct 16 16:06:28 2017 -0700 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 8 +++++ .../samza/storage/kv/RocksDbKeyValueStore.scala | 32 ++++++++++++----- .../storage/kv/TestRocksDbKeyValueStore.scala | 38 ++++++++++++++++++-- 3 files changed, 66 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/343712e3/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index fc1e86d..fb8a97e 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -1781,6 +1781,14 @@ </tr> <tr> + <td class="property" id="stores-rocksdb-metrics">stores.<span class="store">store-name</span>.<br>rocksdb.metrics.list</td> + <td class="default"></td> + <td class="description"> + A list of RocksDB <a href="https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L409">properties</a> to expose as metrics (gauges). + </td> + </tr> + + <tr> <th colspan="3" class="section" id="cluster-manager"> Running Samza with a cluster manager<br> </th> http://git-wip-us.apache.org/repos/asf/samza/blob/343712e3/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index c771788..023e4a8 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -28,7 +28,7 @@ import org.rocksdb.TtlDB object RocksDbKeyValueStore extends Logging { - def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, storeName: String): RocksDB = { + def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, storeName: String, metrics: KeyValueStoreMetrics): RocksDB = { var ttl = 0L var useTTL = false @@ -68,15 +68,29 @@ object RocksDbKeyValueStore extends Logging { try { - if (useTTL) - { - info("Opening RocksDB store with TTL value: %s" format ttl) - TtlDB.open(options, dir.toString, ttl.toInt, false) - } - else + val rocksDb = + if (useTTL) + { + info("Opening RocksDB store with TTL value: %s" format ttl) + TtlDB.open(options, dir.toString, ttl.toInt, false) + } + else + { + RocksDB.open(options, dir.toString) + } + + if (storeConfig.containsKey("rocksdb.metrics.list")) { - RocksDB.open(options, dir.toString) + storeConfig + .get("rocksdb.metrics.list") + .split(",") + .map(property => property.trim) + .foreach(property => + metrics.newGauge(property, () => rocksDb.getProperty(property)) + ) } + + rocksDb } catch { @@ -104,7 +118,7 @@ class RocksDbKeyValueStore( // lazy val here is important because the store directories do not exist yet, it can only be opened // after the directories are created, which happens much later from now. - private lazy val db = RocksDbKeyValueStore.openDB(dir, options, storeConfig, isLoggedStore, storeName) + private lazy val db = RocksDbKeyValueStore.openDB(dir, options, storeConfig, isLoggedStore, storeName, metrics) private val lexicographic = new LexicographicComparator() def get(key: Array[Byte]): Array[Byte] = { http://git-wip-us.apache.org/repos/asf/samza/blob/343712e3/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala index 05d39ea..6f129be 100644 --- a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala @@ -24,6 +24,7 @@ import java.io.File import java.util import org.apache.samza.config.MapConfig +import org.apache.samza.metrics.{Gauge, MetricsRegistryMap} import org.apache.samza.util.ExponentialSleepStrategy import org.junit.{Assert, Test} import org.rocksdb.{RocksIterator, RocksDB, FlushOptions, Options} @@ -41,7 +42,8 @@ class TestRocksDbKeyValueStore options, config, false, - "someStore") + "someStore", + null) val key = "test".getBytes("UTF-8") rocksDB.put(key, "val".getBytes("UTF-8")) Assert.assertNotNull(rocksDB.get(key)) @@ -72,7 +74,8 @@ class TestRocksDbKeyValueStore options, config, false, - "dbStore") + "dbStore", + null) val key = "key".getBytes("UTF-8") rocksDB.put(key, "val".getBytes("UTF-8")) // SAMZA-836: Mysteriously,calling new FlushOptions() does not invoke the NativeLibraryLoader in rocksdbjni-3.13.1! @@ -98,7 +101,8 @@ class TestRocksDbKeyValueStore options, config, false, - "dbStore") + "dbStore", + null) val key = "key".getBytes("UTF-8") val key1 = "key1".getBytes("UTF-8") @@ -142,4 +146,32 @@ class TestRocksDbKeyValueStore rocksDB.close() rocksDBReadOnly.close() } + + @Test + def testMetricsConfig(): Unit = { + val registry = new MetricsRegistryMap("registrymap") + val metrics = new KeyValueStoreMetrics("dbstore", registry) + + val map = new util.HashMap[String, String]() + map.put("rocksdb.metrics.list", "rocksdb.estimate-num-keys, rocksdb.estimate-live-data-size") + val config = new MapConfig(map) + val options = new Options() + options.setCreateIfMissing(true) + val rocksDB = RocksDbKeyValueStore.openDB( + new File(System.getProperty("java.io.tmpdir")), + options, + config, + false, + "dbstore", + metrics) + + val metricsGroup = registry.getGroup("org.apache.samza.storage.kv.KeyValueStoreMetrics") + assert(metricsGroup != null) + + val estimateNumKeysMetric = metricsGroup.get("dbstore-rocksdb.estimate-num-keys") + assert(estimateNumKeysMetric.isInstanceOf[Gauge[String]]) + + val estimateLiveDataSizeMetric = metricsGroup.get("dbstore-rocksdb.estimate-live-data-size") + assert(estimateLiveDataSizeMetric.isInstanceOf[Gauge[String]]) + } }
