Repository: samza Updated Branches: refs/heads/master fdb90e7e7 -> 9f30ef10b
SAMZA-647 add batch get API to KV-store Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9f30ef10 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9f30ef10 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9f30ef10 Branch: refs/heads/master Commit: 9f30ef10b5be133cc33f797227f5274397ebd05d Parents: fdb90e7 Author: Mohamed Mahmoud <[email protected]> Authored: Mon May 4 13:35:37 2015 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Mon May 4 13:35:37 2015 -0700 ---------------------------------------------------------------------- README.md | 2 +- .../kv/inmemory/InMemoryKeyValueStore.scala | 26 ++- .../samza/storage/kv/RocksDbKeyValueStore.scala | 26 ++- .../apache/samza/storage/kv/KeyValueStore.java | 166 ++++++++++++++----- .../apache/samza/storage/kv/CachedStore.scala | 37 ++++- .../storage/kv/KeyValueStorageEngine.scala | 10 ++ .../samza/storage/kv/KeyValueStoreMetrics.scala | 4 +- .../apache/samza/storage/kv/LoggedStore.scala | 17 ++ .../storage/kv/NullSafeKeyValueStore.scala | 35 ++-- .../storage/kv/SerializedKeyValueStore.scala | 34 +++- .../src/main/config/perf/kv-perf.properties | 16 +- .../performance/TestKeyValuePerformance.scala | 152 ++++++++++++++--- .../samza/storage/kv/TestKeyValueStores.scala | 83 ++++++++-- 13 files changed, 499 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 7f92020..0492790 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ To run a single test: To run key-value performance tests: - ./gradlew samza-shell:kvPerformanceTest -PconfigPath=file://$PWD/samza-test/src/main/resources/perf/kv-perf.properties + ./gradlew samza-shell:kvPerformanceTest -PconfigPath=file://$PWD/samza-test/src/main/config/perf/kv-perf.properties To run all integration tests: http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala index 217333c..23d028b 100644 --- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala +++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala @@ -35,12 +35,12 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor val underlying = new util.TreeMap[Array[Byte], Array[Byte]] (UnsignedBytes.lexicographicalComparator()) - override def flush(): Unit = { + def flush(): Unit = { // No-op for In memory store. metrics.flushes.inc } - override def close(): Unit = Unit + def close(): Unit = Unit private def getIter(tm:util.SortedMap[Array[Byte], Array[Byte]]) = { new KeyValueIterator[Array[Byte], Array[Byte]] { @@ -64,23 +64,28 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor override def hasNext: Boolean = iter.hasNext } } - override def all(): KeyValueIterator[Array[Byte], Array[Byte]] = { + + def all(): KeyValueIterator[Array[Byte], Array[Byte]] = { metrics.alls.inc getIter(underlying) } - override def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = { + def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = { metrics.ranges.inc require(from != null && to != null, "Null bound not allowed.") getIter(underlying.subMap(from, to)) } - override def delete(key: Array[Byte]): Unit = { + def delete(key: Array[Byte]): Unit = { metrics.deletes.inc put(key, null) } - override def putAll(entries: util.List[Entry[Array[Byte], Array[Byte]]]): Unit = { + def deleteAll(keys: java.util.List[Array[Byte]]) = { + KeyValueStore.Extension.deleteAll(this, keys); + } + + def putAll(entries: util.List[Entry[Array[Byte], Array[Byte]]]): Unit = { // TreeMap's putAll requires a map, so we'd need to iterate over all the entries anyway // to use it, in order to putAll here. Therefore, just iterate here. val iter = entries.iterator() @@ -90,7 +95,7 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor } } - override def put(key: Array[Byte], value: Array[Byte]): Unit = { + def put(key: Array[Byte], value: Array[Byte]): Unit = { metrics.puts.inc require(key != null, "Null key not allowed.") if (value == null) { @@ -102,7 +107,7 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor } } - override def get(key: Array[Byte]): Array[Byte] = { + def get(key: Array[Byte]): Array[Byte] = { metrics.gets.inc require(key != null, "Null key not allowed.") val found = underlying.get(key) @@ -111,5 +116,8 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor } found } -} + def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], Array[Byte]] = { + KeyValueStore.Extension.getAll(this, keys); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index 66c2a0d..1b44a51 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -94,6 +94,24 @@ class RocksDbKeyValueStore( found } + def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], Array[Byte]] = { + metrics.getAlls.inc + require(keys != null, "Null keys not allowed.") + val map = db.multiGet(keys) + if (map != null) { + var bytesRead = 0L + val iterator = map.values().iterator + while (iterator.hasNext) { + val value = iterator.next + if (value != null) { + bytesRead += value.size + } + } + metrics.bytesRead.inc(bytesRead) + } + map + } + def put(key: Array[Byte], value: Array[Byte]) { metrics.puts.inc require(key != null, "Null key not allowed.") @@ -134,6 +152,10 @@ class RocksDbKeyValueStore( put(key, null) } + def deleteAll(keys: java.util.List[Array[Byte]]) = { + KeyValueStore.Extension.deleteAll(this, keys) + } + def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = { metrics.ranges.inc require(from != null && to != null, "Null bound not allowed.") @@ -203,7 +225,7 @@ class RocksDbKeyValueStore( override def finalize() { if (open) { - trace("Leaked reference to level db iterator, forcing close.") + trace("Leaked reference to RocksDB iterator, forcing close.") close() } } @@ -219,4 +241,4 @@ class RocksDbKeyValueStore( super.hasNext() && comparator.compare(peekKey(), to) < 0 } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java index b708341..1278e23 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java @@ -19,63 +19,145 @@ package org.apache.samza.storage.kv; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** - * A key-value store that supports put/get/delete and range queries. - * - * @param <K> The key type - * @param <V> The value type + * A key-value store that supports put, get, delete, and range queries. + * + * @param <K> the type of keys maintained by this key-value store. + * @param <V> the type of values maintained by this key-value store. */ public interface KeyValueStore<K, V> { - /** - * Get the value corresponding to this key - * @param key The key to fetch - * @return The value or null if no value is found. - * @throws NullPointerException If null is used for key. + * Gets the value associated with the specified {@code key}. + * + * @param key the key with which the associated value is to be fetched. + * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}. + * @throws NullPointerException if the specified {@code key} is {@code null}. + */ + V get(K key); + + /** + * Gets the values with which the specified {@code keys} are associated. + * + * @param keys the keys with which the associated values are to be fetched. + * @return a map of the keys that were found and their respective values. + * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. + */ + Map<K, V> getAll(List<K> keys); + + /** + * Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}. + * + * @param key the key with which the specified {@code value} is to be associated. + * @param value the value with which the specified {@code key} is to be associated. + * @throws NullPointerException if the specified {@code key} or {@code value} is {@code null}. + */ + void put(K key, V value); + + /** + * Updates the mappings of the specified key-value {@code entries}. + * + * @param entries the updated mappings to put into this key-value store. + * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key or value. + */ + void putAll(List<Entry<K, V>> entries); + + /** + * Deletes the mapping for the specified {@code key} from this key-value store (if such mapping exists). + * + * @param key the key for which the mapping is to be deleted. + * @throws NullPointerException if the specified {@code key} is {@code null}. */ - public V get(K key); - + void delete(K key); + /** - * Update the value associated with this key - * @param key They key to associate the value to - * @param value The value - * @throws NullPointerException If null is used for key or value. + * Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist). + * + * @param keys the keys for which the mappings are to be deleted. + * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. */ - public void put(K key, V value); - + void deleteAll(List<K> keys); + /** - * Update all the given key/value pairs - * @param entries A list of entries to put into the store. - * @throws NullPointerException If null is used for any key or value. + * Returns an iterator for a range of entries specified by [{@code from}, {@code to}] inclusively. + * + * <p><b>API Note:</b> The returned iterator MUST be closed after use. The comparator used for finding entries that belong to the specified + * range compares the underlying serialized big-endian byte array representation of keys, lexicographically. + * @see <a href="http://en.wikipedia.org/wiki/Lexicographical_order">Lexicographical order article at Wikipedia</a></p> + * @param from the key specifying the low endpoint (inclusive) of the keys in the returned range. + * @param to the key specifying the high endpoint (inclusive) of the keys in the returned range. + * @return an iterator for the specified key range. + * @throws NullPointerException if null is used for {@code from} or {@code to}. */ - public void putAll(List<Entry<K,V>> entries); - + KeyValueIterator<K, V> range(K from, K to); + /** - * Delete the value from the store (if there is one) - * @param key The key - * @throws NullPointerException If null is used for key. + * Returns an iterator for all entries in this key-value store. + * + * <p><b>API Note:</b> The returned iterator MUST be closed after use.</p> + * @return an iterator for all entries in this key-value store. */ - public void delete(K key); - + KeyValueIterator<K, V> all(); + /** - * Get an iterator over a given range of keys. This iterator MUST be closed after use. - * @param from The first key that could be in the range - * @param to The last key that could be in the range - * @return The iterator for this range. - * @throws NullPointerException If null is used for from or to. + * Closes this key-value store, if applicable, relinquishing any underlying resources. */ - public KeyValueIterator<K,V> range(K from, K to); - + void close(); + /** - * Return an iterator over all keys in the database. This iterator MUST be closed after use. - * @return An iterator of all key/value pairs in the store. + * Flushes this key-value store, if applicable. */ - public KeyValueIterator<K,V> all(); - - public void close(); - - public void flush(); - + void flush(); + + /** + * Represents an extension for classes that implement {@link KeyValueStore}. + */ + // TODO replace with default interface methods when we can use Java 8 features. + class Extension { + private Extension() { + // This class cannot be instantiated + } + + /** + * Gets the values with which the specified {@code keys} are associated. + * + * @param store the key-value store for which this operation is to be performed. + * @param keys the keys with which the associated values are to be fetched. + * @param <K> the type of keys maintained by the specified {@code store}. + * @param <V> the type of values maintained by the specified {@code store}. + * @return a map of the keys that were found and their respective values. + * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. + */ + public static <K, V> Map<K, V> getAll(final KeyValueStore<K, V> store, final List<K> keys) { + final Map<K, V> map = new HashMap<>(keys.size()); + + for (final K key : keys) { + final V value = store.get(key); + + if (value != null) { + map.put(key, value); + } + } + + return map; + } + + /** + * Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist). + * + * @param store the key-value store for which this operation is to be performed. + * @param keys the keys for which the mappings are to be deleted. + * @param <K> the type of keys maintained by the specified {@code store}. + * @param <V> the type of values maintained by the specified {@code store}. + * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. + */ + public static <K, V> void deleteAll(final KeyValueStore<K, V> store, final List<K> keys) { + for (final K key : keys) { + store.delete(key); + } + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala index 61bb3f6..479016d 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala @@ -97,6 +97,33 @@ class CachedStore[K, V]( } } + def getAll(keys: java.util.List[K]): java.util.Map[K, V] = { + metrics.gets.inc(keys.size) + val returnValue = new java.util.HashMap[K, V](keys.size) + val misses = new java.util.ArrayList[K] + val keysIterator = keys.iterator + while (keysIterator.hasNext) { + val key = keysIterator.next + val cached = cache.get(key) + if (cached != null) { + metrics.cacheHits.inc + returnValue.put(key, cached.value) + } else { + misses.add(key) + } + } + if (!misses.isEmpty) { + val entryIterator = store.getAll(misses).entrySet.iterator + while (entryIterator.hasNext) { + val entry = entryIterator.next + returnValue.put(entry.getKey, entry.getValue) + cache.put(entry.getKey, new CacheEntry(entry.getValue, null)) + } + cacheCount = cache.size // update outside the loop since it's used for metrics and not for time-sensitive logic + } + returnValue + } + def range(from: K, to: K) = { metrics.ranges.inc flush() @@ -172,9 +199,6 @@ class CachedStore[K, V]( this.dirtyCount = 0 } - /** - * Perform multiple local updates and log out all changes to the changelog - */ def putAll(entries: java.util.List[Entry[K, V]]) { val iter = entries.iterator while (iter.hasNext) { @@ -183,15 +207,16 @@ class CachedStore[K, V]( } } - /** - * Perform the local delete and log it out to the changelog - */ def delete(key: K) { metrics.deletes.inc put(key, null.asInstanceOf[V]) } + def deleteAll(keys: java.util.List[K]) = { + KeyValueStore.Extension.deleteAll(this, keys); + } + def close() { trace("Closing.") http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index 3a23daf..fc677b2 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -44,6 +44,11 @@ class KeyValueStorageEngine[K, V]( db.get(key) } + def getAll(keys: java.util.List[K]): java.util.Map[K, V] = { + metrics.gets.inc(keys.size) + db.getAll(keys) + } + def put(key: K, value: V) = { metrics.puts.inc db.put(key, value) @@ -59,6 +64,11 @@ class KeyValueStorageEngine[K, V]( db.delete(key) } + def deleteAll(keys: java.util.List[K]) = { + metrics.deletes.inc(keys.size) + db.deleteAll(keys) + } + def range(from: K, to: K) = { metrics.ranges.inc db.range(from, to) http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala index 79092b9..967d509 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala @@ -26,13 +26,15 @@ class KeyValueStoreMetrics( val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper { val gets = newCounter("gets") + val getAlls = newCounter("getAlls") val ranges = newCounter("ranges") val alls = newCounter("alls") val puts = newCounter("puts") val deletes = newCounter("deletes") + val deleteAlls = newCounter("deleteAlls") val flushes = newCounter("flushes") val bytesWritten = newCounter("bytes-written") val bytesRead = newCounter("bytes-read") override def getPrefix = storeName + "-" -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala index 26f4cd9..7bba6ff 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala @@ -41,6 +41,11 @@ class LoggedStore[K, V]( store.get(key) } + def getAll(keys: java.util.List[K]): java.util.Map[K, V] = { + metrics.gets.inc(keys.size) + store.getAll(keys) + } + def range(from: K, to: K) = { metrics.ranges.inc store.range(from, to) @@ -82,6 +87,18 @@ class LoggedStore[K, V]( collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, key, null)) } + /** + * Perform the local deletes and log them out to the changelog + */ + def deleteAll(keys: java.util.List[K]) = { + metrics.deletes.inc(keys.size) + store.deleteAll(keys) + val keysIterator = keys.iterator + while (keysIterator.hasNext) { + collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, keysIterator.next, null)) + } + } + def flush { trace("Flushing.") http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala index 4f48cf4..3de257c 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala @@ -24,40 +24,53 @@ import org.apache.samza.util.Util.notNull import scala.collection.JavaConversions._ object NullSafeKeyValueStore { - val KEY_ERROR_MSG = "Null is not a valid key." - val VAL_ERROR_MSG = "Null is not a valid value." + val NullKeyErrorMessage = "Null is not a valid key." + val NullKeysErrorMessage = "Null is not a valid keys list." + val NullValueErrorMessage = "Null is not a valid value." } class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueStore[K, V] { import NullSafeKeyValueStore._ def get(key: K): V = { - notNull(key, KEY_ERROR_MSG) + notNull(key, NullKeyErrorMessage) store.get(key) } + def getAll(keys: java.util.List[K]): java.util.Map[K, V] = { + notNull(keys, NullKeysErrorMessage) + keys.foreach(key => notNull(key, NullKeyErrorMessage)) + store.getAll(keys) + } + def put(key: K, value: V) { - notNull(key, KEY_ERROR_MSG) - notNull(value, VAL_ERROR_MSG) + notNull(key, NullKeyErrorMessage) + notNull(value, NullValueErrorMessage) store.put(key, value) } def putAll(entries: java.util.List[Entry[K, V]]) { entries.foreach(entry => { - notNull(entry.getKey, KEY_ERROR_MSG) - notNull(entry.getValue, VAL_ERROR_MSG) + notNull(entry.getKey, NullKeyErrorMessage) + notNull(entry.getValue, NullValueErrorMessage) }) store.putAll(entries) } def delete(key: K) { - notNull(key, KEY_ERROR_MSG) + notNull(key, NullKeyErrorMessage) store.delete(key) } + def deleteAll(keys: java.util.List[K]) = { + notNull(keys, NullKeysErrorMessage) + keys.foreach(key => notNull(key, NullKeyErrorMessage)) + store.deleteAll(keys) + } + def range(from: K, to: K): KeyValueIterator[K, V] = { - notNull(from, KEY_ERROR_MSG) - notNull(to, KEY_ERROR_MSG) + notNull(from, NullKeyErrorMessage) + notNull(to, NullKeyErrorMessage) store.range(from, to) } @@ -72,4 +85,4 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt def close { store.close } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala index 531e8be..8e183ef 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala @@ -38,6 +38,22 @@ class SerializedKeyValueStore[K, V]( fromBytesOrNull(found, msgSerde) } + def getAll(keys: java.util.List[K]): java.util.Map[K, V] = { + metrics.gets.inc(keys.size) + val mapBytes = store.getAll(serializeKeys(keys)) + if (mapBytes != null) { + val map = new java.util.HashMap[K, V](mapBytes.size) + val entryIterator = mapBytes.entrySet.iterator + while (entryIterator.hasNext) { + val entry = entryIterator.next + map.put(fromBytesOrNull(entry.getKey, keySerde), fromBytesOrNull(entry.getValue, msgSerde)) + } + map + } else { + null.asInstanceOf[java.util.Map[K, V]] + } + } + def put(key: K, value: V) { metrics.puts.inc val keyBytes = toBytesOrNull(key, keySerde) @@ -64,6 +80,11 @@ class SerializedKeyValueStore[K, V]( store.delete(keyBytes) } + def deleteAll(keys: java.util.List[K]) = { + metrics.deletes.inc(keys.size) + store.deleteAll(serializeKeys(keys)) + } + def range(from: K, to: K): KeyValueIterator[K, V] = { metrics.ranges.inc val fromBytes = toBytesOrNull(from, keySerde) @@ -102,7 +123,7 @@ class SerializedKeyValueStore[K, V]( store.close } - def toBytesOrNull[T](t: T, serde: Serde[T]): Array[Byte] = if (t == null) { + private def toBytesOrNull[T](t: T, serde: Serde[T]): Array[Byte] = if (t == null) { null } else { val bytes = serde.toBytes(t) @@ -110,11 +131,20 @@ class SerializedKeyValueStore[K, V]( bytes } - def fromBytesOrNull[T](bytes: Array[Byte], serde: Serde[T]): T = if (bytes == null) { + private def fromBytesOrNull[T](bytes: Array[Byte], serde: Serde[T]): T = if (bytes == null) { null.asInstanceOf[T] } else { val obj = serde.fromBytes(bytes) metrics.bytesDeserialized.inc(bytes.size) obj } + + private def serializeKeys(keys: java.util.List[K]): java.util.List[Array[Byte]] = { + val bytes = new java.util.ArrayList[Array[Byte]](keys.size) + val keysIterator = keys.iterator + while (keysIterator.hasNext) { + bytes.add(toBytesOrNull(keysIterator.next, keySerde)) + } + bytes + } } http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-test/src/main/config/perf/kv-perf.properties ---------------------------------------------------------------------- diff --git a/samza-test/src/main/config/perf/kv-perf.properties b/samza-test/src/main/config/perf/kv-perf.properties index 33fcd8d..7339052 100644 --- a/samza-test/src/main/config/perf/kv-perf.properties +++ b/samza-test/src/main/config/perf/kv-perf.properties @@ -37,5 +37,19 @@ test.rocksdb-write-performance.set-2.message.count=1000000 test.rocksdb-write-performance.set-3.message.size=1024 test.rocksdb-write-performance.set-3.message.count=1000000 +# Config for get-all-vs-get +test.get-all-vs-get-write-many-read-many.stores.test-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory +# Disable caching +test.get-all-vs-get-write-many-read-many.stores.test-store.object.cache.size=0 +test.get-all-vs-get-write-many-read-many.partition.count=4 +test.get-all-vs-get-write-many-read-many.set.count=1 + +# Config for get-all-vs-get-write-once-read-many +test.get-all-vs-get-write-once-read-many.stores.test-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory +# Disable caching +test.get-all-vs-get-write-once-read-many.stores.test-store.object.cache.size=0 +test.get-all-vs-get-write-once-read-many.partition.count=4 +test.get-all-vs-get-write-once-read-many.set.count=3 + # List of tests to execute -test.methods=rocksdb-write-performance +test.methods=rocksdb-write-performance,get-all-vs-get-write-many-read-many,get-all-vs-get-write-once-read-many http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala index 0858b98..1ce7d25 100644 --- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala @@ -19,25 +19,26 @@ package org.apache.samza.test.performance -import org.apache.samza.util.Logging +import java.io.File +import java.util +import java.util.UUID +import java.util.concurrent.TimeUnit + +import com.google.common.base.Stopwatch import org.apache.samza.config.Config import org.apache.samza.config.StorageConfig._ -import org.apache.samza.container.{TaskName, SamzaContainerContext} +import org.apache.samza.container.{SamzaContainerContext, TaskName} import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.storage.kv.KeyValueStore -import org.apache.samza.storage.kv.KeyValueStorageEngine +import org.apache.samza.serializers.{ByteSerde, SerdeManager, UUIDSerde} import org.apache.samza.storage.StorageEngineFactory -import org.apache.samza.util.CommandLine -import org.apache.samza.util.Util -import org.apache.samza.serializers.{StringSerde, ByteSerde, SerdeManager} -import org.apache.samza.Partition -import org.apache.samza.SamzaException +import org.apache.samza.storage.kv.{KeyValueStorageEngine, KeyValueStore} +import org.apache.samza.system.{SystemProducer, SystemProducers} import org.apache.samza.task.TaskInstanceCollector -import org.apache.samza.system.SystemProducers -import org.apache.samza.system.SystemProducer -import java.io.File -import java.util.UUID -import java.util +import org.apache.samza.util.{CommandLine, Logging, Util} +import org.apache.samza.{Partition, SamzaException} + +import scala.collection.JavaConversions._ +import scala.util.Random /** * A simple CLI-based tool for running various key-value performance tests. @@ -60,14 +61,15 @@ object TestKeyValuePerformance extends Logging { val testMethods: Map[String, (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit] = Map( "all-with-deletes" -> runTestAllWithDeletes, - "rocksdb-write-performance" -> runTestMsgWritePerformance - ) + "rocksdb-write-performance" -> runTestMsgWritePerformance, + "get-all-vs-get-write-many-read-many" -> runTestGetAllVsGetWriteManyReadMany, + "get-all-vs-get-write-once-read-many" -> runTestGetAllVsGetWriteOnceReadMany) def main(args: Array[String]) { val cmdline = new CommandLine val options = cmdline.parser.parse(args: _*) val config = cmdline.loadConfig(options) - val tests = config.get("test.methods", "rocksdb-write-performance,all-with-deletes").split(",") + val tests = config.get("test.methods").split(",") tests.foreach{ test => info("Running test: %s" format test) @@ -81,7 +83,7 @@ object TestKeyValuePerformance extends Logging { } def invokeTest(testName: String, testMethod: (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit, config: Config) { - val taskNames = new util.ArrayList[TaskName]() + val taskNames = new java.util.ArrayList[TaskName]() val partitionCount = config.getInt("partition.count", 1) (0 until partitionCount).map(p => taskNames.add(new TaskName(new Partition(p).toString))) @@ -139,7 +141,6 @@ object TestKeyValuePerformance extends Logging { info("Using (num loops, messages per batch, message size in bytes) => (%s, %s, %s)" format (numLoops, messagesPerBatch, messageSizeBytes)) new TestKeyValuePerformance().testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes) - } def runTestMsgWritePerformance(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) { @@ -150,6 +151,13 @@ object TestKeyValuePerformance extends Logging { new TestKeyValuePerformance().testMsgWritePerformance(db, messageCount, messageSizeBytes) } + def runTestGetAllVsGetWriteManyReadMany(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) { + new TestKeyValuePerformance().testGetAllVsGetWriteManyReadMany(db, config) + } + + def runTestGetAllVsGetWriteOnceReadMany(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) { + new TestKeyValuePerformance().testGetAllVsGetWriteOnceReadMany(db, config) + } } class TestKeyValuePerformance extends Logging { @@ -200,7 +208,6 @@ class TestKeyValuePerformance extends Logging { info("Total time: %ss" format ((System.currentTimeMillis - start) * .001)) } - /** * Test that successively writes a set of fixed-size messages to the KV store * and computes the total time for the operations @@ -222,4 +229,107 @@ class TestKeyValuePerformance extends Logging { val timeTaken = System.currentTimeMillis - start info("Total time to write %d msgs of size %d bytes : %s s" format (numMsgs, msgSizeInBytes, timeTaken * .001)) } -} \ No newline at end of file + + /** + * Test that ::getAll performance is better than that of ::get (test when there are many writes and many reads). + * @param store key-value store instance that is being tested + * @param config the test case's config + */ + def testGetAllVsGetWriteManyReadMany(store: KeyValueStore[Array[Byte],Array[Byte]], config: Config): Unit = { + val iterationsCount = config.getInt("iterations.count", 100) + val maxMessagesCountPerBatch = config.getInt("message.max-count-per-batch", 100000) + val maxMessageSizeBytes = config.getInt("message.max-size.bytes", 1024) + val timer = Stopwatch.createUnstarted + val uuidSerde = new UUIDSerde + + info("iterations count: " + iterationsCount) + info("max messages count per batch: " + maxMessagesCountPerBatch) + info("max message size in bytes: " + maxMessageSizeBytes) + info("%12s%12s%12s%12s".format("Msg Count", "Bytes/Msg", "get ms", "getAll ms")) + + try { + (0 until iterationsCount).foreach(i => { + val messageSizeBytes = Random.nextInt(maxMessageSizeBytes) + val messagesCountPerBatch = Random.nextInt(maxMessagesCountPerBatch) + val keys = (0 until messagesCountPerBatch).map(k => uuidSerde.toBytes(UUID.randomUUID)).toList + val shuffledKeys = Random.shuffle(keys) // to reduce locality of reference -- sequential access may be unfair + + keys.foreach(k => store.put(k, Random.nextString(messageSizeBytes).getBytes(Encoding))) + store.flush() + + timer.reset().start() + assert(store.getAll(shuffledKeys).size == shuffledKeys.size) + val getAllTime = timer.stop().elapsed(TimeUnit.MILLISECONDS) + + // Restore cache, in case it's enabled, to a state similar to the one above when the getAll test started + keys.foreach(k => store.put(k, Random.nextString(messageSizeBytes).getBytes(Encoding))) + store.flush() + + timer.reset().start() + shuffledKeys.foreach(store.get) + val getTime = timer.stop().elapsed(TimeUnit.MILLISECONDS) + + info("%12d%12d%12d%12d".format(messagesCountPerBatch, messageSizeBytes, getTime, getAllTime)) + if (getAllTime > getTime) { + error("getAll was slower than get!") + } + }) + } finally { + store.close() + } + } + + /** + * Test that ::getAll performance is better than that of ::get (test when data are written once and read many times); + * load is usually greater than the storage engine's cache size (not to be confused with Samza's cache layer), + * and keys are randomly selected from the stored entries to perform a fair comparison of ::get vs. ::getAll (in case + * the underlying storage engine caches data in blocks and ::getAll causes a block to be loaded into the cache -- + * one can argue that ::get should trigger the same behavior, but it's worth testing this WORM scenario regardless) + * @param store key-value store instance that is being tested + * @param config the test case's config + */ + def testGetAllVsGetWriteOnceReadMany(store: KeyValueStore[Array[Byte],Array[Byte]], config: Config): Unit = { + val iterationsCount = config.getInt("iterations.count", 100) + val maxMessagesCountPerBatch = config.getInt("message.max-count-per-batch", 10000 + Random.nextInt(20000)) + val maxMessageSizeBytes = config.getInt("message.max-size.bytes", 1024) + val totalMessagesCount = iterationsCount * maxMessagesCountPerBatch + val timer = Stopwatch.createUnstarted + val uuidSerde = new UUIDSerde + + info("write once -- putting %d messages in store".format(totalMessagesCount)) + val keys = (0 until totalMessagesCount).map(k => uuidSerde.toBytes(UUID.randomUUID)).toList + keys.foreach(k => store.put(k, Random.nextString(Random.nextInt(maxMessageSizeBytes)).getBytes(Encoding))) + store.flush() + + info("iterations count: " + iterationsCount) + info("max messages count per batch: " + maxMessagesCountPerBatch) + info("max message size in bytes: " + maxMessageSizeBytes) + info("%12s%12s%12s%12s".format("Msg Count", "Total Size", "get ms", "getAll ms")) + + try { + (0 until iterationsCount).foreach(i => { + val messagesCountPerBatch = Random.nextInt(maxMessagesCountPerBatch) + val shuffledKeys = Random.shuffle(keys).take(messagesCountPerBatch) + + // We want to measure ::getAll when called many times, so populate the cache because first call is a cache-miss + val totalSize = store.getAll(shuffledKeys).values.map(_.length).sum + timer.reset().start() + assert(store.getAll(shuffledKeys).size == shuffledKeys.size) + val getAllTime = timer.stop().elapsed(TimeUnit.MILLISECONDS) + + // We want to measure ::get when called many times, so populate the cache because first call is a cache-miss + shuffledKeys.foreach(store.get) + timer.reset().start() + shuffledKeys.foreach(store.get) + val getTime = timer.stop().elapsed(TimeUnit.MILLISECONDS) + + info("%12d%12d%12d%12d".format(messagesCountPerBatch, totalSize, getTime, getAllTime)) + if (getAllTime > getTime) { + error("getAll was slower than get!") + } + }) + } finally { + store.close() + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala index 50dfc10..9dee7be 100644 --- a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala +++ b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala @@ -97,28 +97,48 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { } @Test - def getNonExistantIsNull() { + def getNonExistentIsNull() { assertNull(store.get(b("hello"))) } @Test - def putAndGet() { - store.put(b("k"), b("v")) - assertArrayEquals(b("v"), store.get(b("k"))) + def testGetAllWhenZeroMatch() { + store.put(b("hello"), b("world")) + val keys = List(b("foo"), b("bar")) + val actual = store.getAll(keys) + keys.foreach(k => assertNull("Key: " + k, actual.get(k))) } @Test - def putStessTest() { - for( a <- 0 to 1900000){ - store.put(b(a+"k"), b("v")) - } + def testGetAllWhenFullMatch() { + val expected = Map(b("k0") -> b("v0"), b("k1") -> b("v1")) + expected.foreach(e => store.put(e._1, e._2)) + val actual = store.getAll(expected.keys.toList) + assertEquals("Size", expected.size, actual.size) + expected.foreach(e => assertArrayEquals("Value at: " + s(e._1), e._2, actual.get(e._1))) + } + + @Test + def testGetAllWhenPartialMatch() { + val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1"), b("k2") -> b("v2")) + val found = all.entrySet.head + val notFound = all.entrySet.last + store.put(found.getKey, found.getValue) + val actual = store.getAll(List(notFound.getKey, found.getKey)) + assertNull(actual.get(notFound.getKey)) + assertArrayEquals(found.getValue, actual.get(found.getKey)) + } + + @Test + def putAndGet() { + store.put(b("k"), b("v")) + assertArrayEquals(b("v"), store.get(b("k"))) } @Test def doublePutAndGet() { val k = b("k2") store.put(k, b("v1")) - store.put(k, b("v2")) store.put(k, b("v3")) assertArrayEquals(b("v3"), store.get(k)) } @@ -127,11 +147,13 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { def testNullsWithSerde() { if (serde) { val a = b("a") - val keyMsg = Some(NullSafeKeyValueStore.KEY_ERROR_MSG) - val valMsg = Some(NullSafeKeyValueStore.VAL_ERROR_MSG) intercept[NullPointerException] { store.get(null) } + intercept[NullPointerException] { store.getAll(null) } + intercept[NullPointerException] { store.getAll(List(a, null)) } intercept[NullPointerException] { store.delete(null) } + intercept[NullPointerException] { store.deleteAll(null) } + intercept[NullPointerException] { store.deleteAll(List(a, null)) } intercept[NullPointerException] { store.put(null, a) } intercept[NullPointerException] { store.put(a, null) } intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null))) } @@ -190,6 +212,41 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { } @Test + def testDeleteAllWhenZeroMatch() { + val foo = b("foo") + store.put(foo, foo) + store.deleteAll(List(b("bar"))) + assertArrayEquals(foo, store.get(foo)) + } + + @Test + def testDeleteAllWhenFullMatch() { + val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1")) + all.foreach(e => store.put(e._1, e._2)) + assertEquals(all.size, store.getAll(all.keys.toList).size) + store.deleteAll(all.keys.toList) + all.keys.foreach(key => assertNull("Value at: " + s(key), store.get(key))) + } + + @Test + def testDeleteAllWhenPartialMatch() { + val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1")) + val found = all.entrySet.head + val leftAlone = all.entrySet.last + all.foreach(e => store.put(e._1, e._2)) + assertArrayEquals(found.getValue, store.get(found.getKey)) + store.deleteAll(List(b("not found"), found.getKey)) + store.flush() + val allIterator = store.all + try { + assertEquals(1, allIterator.size) + assertArrayEquals(leftAlone.getValue, store.get(leftAlone.getKey)) + } finally { + allIterator.close() + } + } + + @Test def testSimpleScenario() { val vals = letters.map(b(_)) for (v <- vals) { @@ -342,8 +399,8 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { } object TestKeyValueStores { - val CacheSize = 1000000 - val BatchSize = 1000000 + val CacheSize = 1024 + val BatchSize = 1024 @Parameters def parameters: java.util.Collection[Array[String]] = Arrays.asList( //Inmemory
