Repository: samza Updated Branches: refs/heads/master 62904334b -> 958edc42f
Minor fixes to KeyValueStore and RocksDBKeyValueStore 1. Replaced extension class in KeyValueStore with default methods. 2. Fixed formatting in RocksDBKeyValueStore#openDB. 3. Now logs original RocksDBException on errors opening the db. Other minor log message cleanup. Author: Prateek Maheshwari <[email protected]> Reviewers: Jacob Maes <[email protected]> Closes #332 from prateekm/store-fixes Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/958edc42 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/958edc42 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/958edc42 Branch: refs/heads/master Commit: 958edc42f59a1891e14276251462236abd2af5b9 Parents: 6290433 Author: Prateek Maheshwari <[email protected]> Authored: Thu Oct 19 11:36:02 2017 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Thu Oct 19 11:36:02 2017 -0700 ---------------------------------------------------------------------- .../apache/samza/storage/kv/KeyValueStore.java | 69 ++++------------ .../kv/inmemory/InMemoryKeyValueStore.scala | 8 -- .../samza/storage/kv/RocksDbKeyValueStore.scala | 86 ++++++++------------ .../apache/samza/storage/kv/CachedStore.scala | 2 +- .../samza/storage/kv/MockKeyValueStore.scala | 8 -- 5 files changed, 51 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java index b1fea7b..18a89ec 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java +++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java @@ -46,7 +46,19 @@ public interface KeyValueStore<K, V> { * @return a map of the keys that were found and their respective values. * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. */ - Map<K, V> getAll(List<K> keys); + default Map<K, V> getAll(List<K> keys) { + Map<K, V> map = new HashMap<>(keys.size()); + + for (K key : keys) { + V value = get(key); + + if (value != null) { + map.put(key, value); + } + } + + return map; + } /** * Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}. @@ -79,7 +91,11 @@ public interface KeyValueStore<K, V> { * @param keys the keys for which the mappings are to be deleted. * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. */ - void deleteAll(List<K> keys); + default void deleteAll(List<K> keys) { + for (K key : keys) { + delete(key); + } + } /** * Returns an iterator for a sorted range of entries specified by [{@code from}, {@code to}). @@ -111,53 +127,4 @@ public interface KeyValueStore<K, V> { * Flushes this key-value store, if applicable. */ void flush(); - - /** - * Represents an extension for classes that implement {@link KeyValueStore}. - */ - // TODO replace with default interface methods when we can use Java 8 features. - class Extension { - private Extension() { - // This class cannot be instantiated - } - - /** - * Gets the values with which the specified {@code keys} are associated. - * - * @param store the key-value store for which this operation is to be performed. - * @param keys the keys with which the associated values are to be fetched. - * @param <K> the type of keys maintained by the specified {@code store}. - * @param <V> the type of values maintained by the specified {@code store}. - * @return a map of the keys that were found and their respective values. - * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. - */ - public static <K, V> Map<K, V> getAll(final KeyValueStore<K, V> store, final List<K> keys) { - final Map<K, V> map = new HashMap<>(keys.size()); - - for (final K key : keys) { - final V value = store.get(key); - - if (value != null) { - map.put(key, value); - } - } - - return map; - } - - /** - * Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist). - * - * @param store the key-value store for which this operation is to be performed. - * @param keys the keys for which the mappings are to be deleted. - * @param <K> the type of keys maintained by the specified {@code store}. - * @param <V> the type of values maintained by the specified {@code store}. - * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. - */ - public static <K, V> void deleteAll(final KeyValueStore<K, V> store, final List<K> keys) { - for (final K key : keys) { - store.delete(key); - } - } - } } http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala index 4c245b6..7b83163 100644 --- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala +++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala @@ -81,10 +81,6 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor put(key, null) } - override def deleteAll(keys: java.util.List[Array[Byte]]) = { - KeyValueStore.Extension.deleteAll(this, keys) - } - override def putAll(entries: util.List[Entry[Array[Byte], Array[Byte]]]): Unit = { // TreeMap's putAll requires a map, so we'd need to iterate over all the entries anyway // to use it, in order to putAll here. Therefore, just iterate here. @@ -116,8 +112,4 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor } found } - - override def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], Array[Byte]] = { - KeyValueStore.Extension.getAll(this, keys); - } } http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index 023e4a8..135cff9 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -20,89 +20,71 @@ package org.apache.samza.storage.kv import java.io.File +import java.util.concurrent.TimeUnit + import org.apache.samza.SamzaException -import org.apache.samza.util.{ LexicographicComparator, Logging } import org.apache.samza.config.Config -import org.rocksdb._ -import org.rocksdb.TtlDB +import org.apache.samza.util.{LexicographicComparator, Logging} +import org.rocksdb.{TtlDB, _} object RocksDbKeyValueStore extends Logging { - def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, storeName: String, metrics: KeyValueStoreMetrics): RocksDB = { + def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, + storeName: String, metrics: KeyValueStoreMetrics): RocksDB = { var ttl = 0L var useTTL = false - if (storeConfig.containsKey("rocksdb.ttl.ms")) - { - try - { + if (storeConfig.containsKey("rocksdb.ttl.ms")) { + try { ttl = storeConfig.getLong("rocksdb.ttl.ms") - // RocksDB accepts TTL in seconds, convert ms to seconds - if(ttl > 0) { - if (ttl < 1000) - { - warn("The ttl values requested for %s is %d, which is less than 1000 (minimum), using 1000 instead", - storeName, - ttl) + if (ttl > 0) { + if (ttl < 1000) { + warn("The ttl value requested for %s is %d which is less than 1000 (minimum). " + + "Using 1000 ms instead.", storeName, ttl) ttl = 1000 } - ttl = ttl / 1000 - } - else { - warn("Non-positive TTL for RocksDB implies infinite TTL for the data. More Info -https://github.com/facebook/rocksdb/wiki/Time-to-Live") + ttl = TimeUnit.MILLISECONDS.toSeconds(ttl) + } else { + warn("Non-positive TTL for RocksDB implies infinite TTL for the data. " + + "More Info - https://github.com/facebook/rocksdb/wiki/Time-to-Live") } useTTL = true - if (isLoggedStore) - { - warn("%s is a TTL based store, changelog is not supported for TTL based stores, use at your own discretion" format storeName) + if (isLoggedStore) { + warn("%s is a TTL based store. Changelog is not supported for TTL based stores. " + + "Use at your own discretion." format storeName) } + } catch { + case nfe: NumberFormatException => + throw new SamzaException("rocksdb.ttl.ms configuration value %s for store %s is not a number." + format (storeConfig.get("rocksdb.ttl.ms"), storeName), nfe) } - catch - { - case nfe: NumberFormatException => throw new SamzaException("rocksdb.ttl.ms configuration is not a number, " + "value found %s" format storeConfig.get( - "rocksdb.ttl.ms")) - } } - try - { + try { val rocksDb = - if (useTTL) - { + if (useTTL) { info("Opening RocksDB store with TTL value: %s" format ttl) TtlDB.open(options, dir.toString, ttl.toInt, false) - } - else - { + } else { RocksDB.open(options, dir.toString) } - if (storeConfig.containsKey("rocksdb.metrics.list")) - { + if (storeConfig.containsKey("rocksdb.metrics.list")) { storeConfig .get("rocksdb.metrics.list") .split(",") .map(property => property.trim) - .foreach(property => - metrics.newGauge(property, () => rocksDb.getProperty(property)) - ) + .foreach(property => metrics.newGauge(property, () => rocksDb.getProperty(property))) } rocksDb + } catch { + case rocksDBException: RocksDBException => + throw new SamzaException("Error opening RocksDB store %s at location %s" format (storeName, dir.toString), + rocksDBException) } - catch - { - case rocksDBException: RocksDBException => - { - throw new SamzaException( - "Error opening RocksDB store %s at location %s, received the following exception from RocksDB %s".format( - storeName, - dir.toString, - rocksDBException)) - } - } } } @@ -187,10 +169,6 @@ class RocksDbKeyValueStore( put(key, null) } - def deleteAll(keys: java.util.List[Array[Byte]]) = { - KeyValueStore.Extension.deleteAll(this, keys) - } - def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = { metrics.ranges.inc require(from != null && to != null, "Null bound not allowed.") http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala index 44f96b4..d40999a 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala @@ -244,7 +244,7 @@ class CachedStore[K, V]( override def deleteAll(keys: java.util.List[K]) = { lock.synchronized({ - KeyValueStore.Extension.deleteAll(this, keys) + super.deleteAll(keys) }) } http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala index f57b275..f66dc04 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala @@ -69,12 +69,4 @@ class MockKeyValueStore extends KeyValueStore[String, String] { override def flush() {} // no-op override def close() { kvMap.clear() } - - override def deleteAll(keys: java.util.List[String]) { - KeyValueStore.Extension.deleteAll(this, keys) - } - - override def getAll(keys: java.util.List[String]): java.util.Map[String, String] = { - KeyValueStore.Extension.getAll(this, keys) - } } \ No newline at end of file
