Repository: samza
Updated Branches:
  refs/heads/master fdb90e7e7 -> 9f30ef10b


SAMZA-647 add batch get API to KV-store


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9f30ef10
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9f30ef10
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9f30ef10

Branch: refs/heads/master
Commit: 9f30ef10b5be133cc33f797227f5274397ebd05d
Parents: fdb90e7
Author: Mohamed Mahmoud <[email protected]>
Authored: Mon May 4 13:35:37 2015 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Mon May 4 13:35:37 2015 -0700

----------------------------------------------------------------------
 README.md                                       |   2 +-
 .../kv/inmemory/InMemoryKeyValueStore.scala     |  26 ++-
 .../samza/storage/kv/RocksDbKeyValueStore.scala |  26 ++-
 .../apache/samza/storage/kv/KeyValueStore.java  | 166 ++++++++++++++-----
 .../apache/samza/storage/kv/CachedStore.scala   |  37 ++++-
 .../storage/kv/KeyValueStorageEngine.scala      |  10 ++
 .../samza/storage/kv/KeyValueStoreMetrics.scala |   4 +-
 .../apache/samza/storage/kv/LoggedStore.scala   |  17 ++
 .../storage/kv/NullSafeKeyValueStore.scala      |  35 ++--
 .../storage/kv/SerializedKeyValueStore.scala    |  34 +++-
 .../src/main/config/perf/kv-perf.properties     |  16 +-
 .../performance/TestKeyValuePerformance.scala   | 152 ++++++++++++++---
 .../samza/storage/kv/TestKeyValueStores.scala   |  83 ++++++++--
 13 files changed, 499 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 7f92020..0492790 100644
--- a/README.md
+++ b/README.md
@@ -44,7 +44,7 @@ To run a single test:
 
 To run key-value performance tests:
 
-    ./gradlew samza-shell:kvPerformanceTest 
-PconfigPath=file://$PWD/samza-test/src/main/resources/perf/kv-perf.properties
+    ./gradlew samza-shell:kvPerformanceTest 
-PconfigPath=file://$PWD/samza-test/src/main/config/perf/kv-perf.properties
 
 To run all integration tests:
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
 
b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
index 217333c..23d028b 100644
--- 
a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
+++ 
b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -35,12 +35,12 @@ class InMemoryKeyValueStore(val metrics: 
KeyValueStoreMetrics = new KeyValueStor
 
   val underlying = new util.TreeMap[Array[Byte], Array[Byte]] 
(UnsignedBytes.lexicographicalComparator())
 
-  override def flush(): Unit = {
+  def flush(): Unit = {
     // No-op for In memory store.
     metrics.flushes.inc
   }
 
-  override def close(): Unit = Unit
+  def close(): Unit = Unit
 
   private def getIter(tm:util.SortedMap[Array[Byte], Array[Byte]]) = {
     new KeyValueIterator[Array[Byte], Array[Byte]] {
@@ -64,23 +64,28 @@ class InMemoryKeyValueStore(val metrics: 
KeyValueStoreMetrics = new KeyValueStor
       override def hasNext: Boolean = iter.hasNext
     }
   }
-  override def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+
+  def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
     metrics.alls.inc
     getIter(underlying)
   }
 
-  override def range(from: Array[Byte], to: Array[Byte]): 
KeyValueIterator[Array[Byte], Array[Byte]] = {
+  def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], 
Array[Byte]] = {
     metrics.ranges.inc
     require(from != null && to != null, "Null bound not allowed.")
     getIter(underlying.subMap(from, to))
   }
 
-  override def delete(key: Array[Byte]): Unit = {
+  def delete(key: Array[Byte]): Unit = {
     metrics.deletes.inc
     put(key, null)
   }
 
-  override def putAll(entries: util.List[Entry[Array[Byte], Array[Byte]]]): 
Unit = {
+  def deleteAll(keys: java.util.List[Array[Byte]]) = {
+    KeyValueStore.Extension.deleteAll(this, keys);
+  }
+
+  def putAll(entries: util.List[Entry[Array[Byte], Array[Byte]]]): Unit = {
     // TreeMap's putAll requires a map, so we'd need to iterate over all the 
entries anyway
     // to use it, in order to putAll here.  Therefore, just iterate here.
     val iter = entries.iterator()
@@ -90,7 +95,7 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics 
= new KeyValueStor
     }
   }
 
-  override def put(key: Array[Byte], value: Array[Byte]): Unit = {
+  def put(key: Array[Byte], value: Array[Byte]): Unit = {
     metrics.puts.inc
     require(key != null, "Null key not allowed.")
     if (value == null) {
@@ -102,7 +107,7 @@ class InMemoryKeyValueStore(val metrics: 
KeyValueStoreMetrics = new KeyValueStor
     }
   }
 
-  override def get(key: Array[Byte]): Array[Byte] = {
+  def get(key: Array[Byte]): Array[Byte] = {
     metrics.gets.inc
     require(key != null, "Null key not allowed.")
     val found = underlying.get(key)
@@ -111,5 +116,8 @@ class InMemoryKeyValueStore(val metrics: 
KeyValueStoreMetrics = new KeyValueStor
     }
     found
   }
-}
 
+  def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], 
Array[Byte]] = {
+    KeyValueStore.Extension.getAll(this, keys);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 66c2a0d..1b44a51 100644
--- 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -94,6 +94,24 @@ class RocksDbKeyValueStore(
     found
   }
 
+  def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], 
Array[Byte]] = {
+    metrics.getAlls.inc
+    require(keys != null, "Null keys not allowed.")
+    val map = db.multiGet(keys)
+    if (map != null) {
+      var bytesRead = 0L
+      val iterator = map.values().iterator
+      while (iterator.hasNext) {
+        val value = iterator.next
+        if (value != null) {
+          bytesRead += value.size
+        }
+      }
+      metrics.bytesRead.inc(bytesRead)
+    }
+    map
+  }
+
   def put(key: Array[Byte], value: Array[Byte]) {
     metrics.puts.inc
     require(key != null, "Null key not allowed.")
@@ -134,6 +152,10 @@ class RocksDbKeyValueStore(
     put(key, null)
   }
 
+  def deleteAll(keys: java.util.List[Array[Byte]]) = {
+    KeyValueStore.Extension.deleteAll(this, keys)
+  }
+
   def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], 
Array[Byte]] = {
     metrics.ranges.inc
     require(from != null && to != null, "Null bound not allowed.")
@@ -203,7 +225,7 @@ class RocksDbKeyValueStore(
 
     override def finalize() {
       if (open) {
-        trace("Leaked reference to level db iterator, forcing close.")
+        trace("Leaked reference to RocksDB iterator, forcing close.")
         close()
       }
     }
@@ -219,4 +241,4 @@ class RocksDbKeyValueStore(
       super.hasNext() && comparator.compare(peekKey(), to) < 0
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
index b708341..1278e23 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -19,63 +19,145 @@
 
 package org.apache.samza.storage.kv;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
- * A key-value store that supports put/get/delete and range queries.
- * 
- * @param <K> The key type
- * @param <V> The value type
+ * A key-value store that supports put, get, delete, and range queries.
+ *
+ * @param <K> the type of keys maintained by this key-value store.
+ * @param <V> the type of values maintained by this key-value store.
  */
 public interface KeyValueStore<K, V> {
-  
   /**
-   * Get the value corresponding to this key
-   * @param key The key to fetch
-   * @return The value or null if no value is found.
-   * @throws NullPointerException If null is used for key.
+   * Gets the value associated with the specified {@code key}.
+   *
+   * @param key the key with which the associated value is to be fetched.
+   * @return if found, the value associated with the specified {@code key}; 
otherwise, {@code null}.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   */
+  V get(K key);
+
+  /**
+   * Gets the values with which the specified {@code keys} are associated.
+   *
+   * @param keys the keys with which the associated values are to be fetched.
+   * @return a map of the keys that were found and their respective values.
+   * @throws NullPointerException if the specified {@code keys} list, or any 
of the keys, is {@code null}.
+   */
+  Map<K, V> getAll(List<K> keys);
+
+  /**
+   * Updates the mapping of the specified key-value pair; Associates the 
specified {@code key} with the specified {@code value}.
+   *
+   * @param key the key with which the specified {@code value} is to be 
associated.
+   * @param value the value with which the specified {@code key} is to be 
associated.
+   * @throws NullPointerException if the specified {@code key} or {@code 
value} is {@code null}.
+   */
+  void put(K key, V value);
+
+  /**
+   * Updates the mappings of the specified key-value {@code entries}.
+   *
+   * @param entries the updated mappings to put into this key-value store.
+   * @throws NullPointerException if any of the specified {@code entries} has 
{@code null} as key or value.
+   */
+  void putAll(List<Entry<K, V>> entries);
+
+  /**
+   * Deletes the mapping for the specified {@code key} from this key-value 
store (if such mapping exists).
+   *
+   * @param key the key for which the mapping is to be deleted.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
    */
-  public V get(K key);
-  
+  void delete(K key);
+
   /**
-   * Update the value associated with this key
-   * @param key They key to associate the value to
-   * @param value The value
-   * @throws NullPointerException If null is used for key or value.
+   * Deletes the mappings for the specified {@code keys} from this key-value 
store (if such mappings exist).
+   *
+   * @param keys the keys for which the mappings are to be deleted.
+   * @throws NullPointerException if the specified {@code keys} list, or any 
of the keys, is {@code null}.
    */
-  public void put(K key, V value);
-  
+  void deleteAll(List<K> keys);
+
   /**
-   * Update all the given key/value pairs
-   * @param entries A list of entries to put into the store.
-   * @throws NullPointerException If null is used for any key or value.
+   * Returns an iterator for a range of entries specified by [{@code from}, 
{@code to}] inclusively.
+   *
+   * <p><b>API Note:</b> The returned iterator MUST be closed after use. The 
comparator used for finding entries that belong to the specified
+   * range compares the underlying serialized big-endian byte array 
representation of keys, lexicographically.
+   * @see <a 
href="http://en.wikipedia.org/wiki/Lexicographical_order";>Lexicographical order 
article at Wikipedia</a></p>
+   * @param from the key specifying the low endpoint (inclusive) of the keys 
in the returned range.
+   * @param to the key specifying the high endpoint (inclusive) of the keys in 
the returned range.
+   * @return an iterator for the specified key range.
+   * @throws NullPointerException if null is used for {@code from} or {@code 
to}.
    */
-  public void putAll(List<Entry<K,V>> entries);
-  
+  KeyValueIterator<K, V> range(K from, K to);
+
   /**
-   * Delete the value from the store (if there is one)
-   * @param key The key
-   * @throws NullPointerException If null is used for key.
+   * Returns an iterator for all entries in this key-value store.
+   *
+   * <p><b>API Note:</b> The returned iterator MUST be closed after use.</p>
+   * @return an iterator for all entries in this key-value store.
    */
-  public void delete(K key);
-  
+  KeyValueIterator<K, V> all();
+
   /**
-   * Get an iterator over a given range of keys. This iterator MUST be closed 
after use.
-   * @param from The first key that could be in the range
-   * @param to The last key that could be in the range
-   * @return The iterator for this range.
-   * @throws NullPointerException If null is used for from or to.
+   * Closes this key-value store, if applicable, relinquishing any underlying 
resources.
    */
-  public KeyValueIterator<K,V> range(K from, K to);
-  
+  void close();
+
   /**
-   * Return an iterator over all keys in the database. This iterator MUST be 
closed after use. 
-   * @return An iterator of all key/value pairs in the store.
+   * Flushes this key-value store, if applicable.
    */
-  public KeyValueIterator<K,V> all();
-  
-  public void close();
-  
-  public void flush();
-  
+  void flush();
+
+  /**
+   * Represents an extension for classes that implement {@link KeyValueStore}.
+   */
+  // TODO replace with default interface methods when we can use Java 8 
features.
+  class Extension {
+    private Extension() {
+      // This class cannot be instantiated
+    }
+
+    /**
+     * Gets the values with which the specified {@code keys} are associated.
+     *
+     * @param store the key-value store for which this operation is to be 
performed.
+     * @param keys the keys with which the associated values are to be fetched.
+     * @param <K> the type of keys maintained by the specified {@code store}.
+     * @param <V> the type of values maintained by the specified {@code store}.
+     * @return a map of the keys that were found and their respective values.
+     * @throws NullPointerException if the specified {@code keys} list, or any 
of the keys, is {@code null}.
+     */
+    public static <K, V> Map<K, V> getAll(final KeyValueStore<K, V> store, 
final List<K> keys) {
+      final Map<K, V> map = new HashMap<>(keys.size());
+
+      for (final K key : keys) {
+        final V value = store.get(key);
+
+        if (value != null) {
+          map.put(key, value);
+        }
+      }
+
+      return map;
+    }
+
+    /**
+     * Deletes the mappings for the specified {@code keys} from this key-value 
store (if such mappings exist).
+     *
+     * @param store the key-value store for which this operation is to be 
performed.
+     * @param keys the keys for which the mappings are to be deleted.
+     * @param <K> the type of keys maintained by the specified {@code store}.
+     * @param <V> the type of values maintained by the specified {@code store}.
+     * @throws NullPointerException if the specified {@code keys} list, or any 
of the keys, is {@code null}.
+     */
+    public static <K, V> void deleteAll(final KeyValueStore<K, V> store, final 
List<K> keys) {
+      for (final K key : keys) {
+        store.delete(key);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 61bb3f6..479016d 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -97,6 +97,33 @@ class CachedStore[K, V](
     }
   }
 
+  def getAll(keys: java.util.List[K]): java.util.Map[K, V] = {
+    metrics.gets.inc(keys.size)
+    val returnValue = new java.util.HashMap[K, V](keys.size)
+    val misses = new java.util.ArrayList[K]
+    val keysIterator = keys.iterator
+    while (keysIterator.hasNext) {
+      val key = keysIterator.next
+      val cached = cache.get(key)
+      if (cached != null) {
+        metrics.cacheHits.inc
+        returnValue.put(key, cached.value)
+      } else {
+        misses.add(key)
+      }
+    }
+    if (!misses.isEmpty) {
+      val entryIterator = store.getAll(misses).entrySet.iterator
+      while (entryIterator.hasNext) {
+        val entry = entryIterator.next
+        returnValue.put(entry.getKey, entry.getValue)
+        cache.put(entry.getKey, new CacheEntry(entry.getValue, null))
+      }
+      cacheCount = cache.size // update outside the loop since it's used for 
metrics and not for time-sensitive logic
+    }
+    returnValue
+  }
+
   def range(from: K, to: K) = {
     metrics.ranges.inc
     flush()
@@ -172,9 +199,6 @@ class CachedStore[K, V](
     this.dirtyCount = 0
   }
 
-  /**
-   * Perform multiple local updates and log out all changes to the changelog
-   */
   def putAll(entries: java.util.List[Entry[K, V]]) {
     val iter = entries.iterator
     while (iter.hasNext) {
@@ -183,15 +207,16 @@ class CachedStore[K, V](
     }
   }
 
-  /**
-   * Perform the local delete and log it out to the changelog
-   */
   def delete(key: K) {
     metrics.deletes.inc
 
     put(key, null.asInstanceOf[V])
   }
 
+  def deleteAll(keys: java.util.List[K]) = {
+    KeyValueStore.Extension.deleteAll(this, keys);
+  }
+
   def close() {
     trace("Closing.")
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 3a23daf..fc677b2 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -44,6 +44,11 @@ class KeyValueStorageEngine[K, V](
     db.get(key)
   }
 
+  def getAll(keys: java.util.List[K]): java.util.Map[K, V] = {
+    metrics.gets.inc(keys.size)
+    db.getAll(keys)
+  }
+
   def put(key: K, value: V) = {
     metrics.puts.inc
     db.put(key, value)
@@ -59,6 +64,11 @@ class KeyValueStorageEngine[K, V](
     db.delete(key)
   }
 
+  def deleteAll(keys: java.util.List[K]) = {
+    metrics.deletes.inc(keys.size)
+    db.deleteAll(keys)
+  }
+
   def range(from: K, to: K) = {
     metrics.ranges.inc
     db.range(from, to)

http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
index 79092b9..967d509 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala
@@ -26,13 +26,15 @@ class KeyValueStoreMetrics(
   val registry: MetricsRegistry = new MetricsRegistryMap) extends 
MetricsHelper {
 
   val gets = newCounter("gets")
+  val getAlls = newCounter("getAlls")
   val ranges = newCounter("ranges")
   val alls = newCounter("alls")
   val puts = newCounter("puts")
   val deletes = newCounter("deletes")
+  val deleteAlls = newCounter("deleteAlls")
   val flushes = newCounter("flushes")
   val bytesWritten = newCounter("bytes-written")
   val bytesRead = newCounter("bytes-read")
 
   override def getPrefix = storeName + "-"
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index 26f4cd9..7bba6ff 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -41,6 +41,11 @@ class LoggedStore[K, V](
     store.get(key)
   }
 
+  def getAll(keys: java.util.List[K]): java.util.Map[K, V] = {
+    metrics.gets.inc(keys.size)
+    store.getAll(keys)
+  }
+
   def range(from: K, to: K) = {
     metrics.ranges.inc
     store.range(from, to)
@@ -82,6 +87,18 @@ class LoggedStore[K, V](
     collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, key, 
null))
   }
 
+  /**
+   * Perform the local deletes and log them out to the changelog
+   */
+  def deleteAll(keys: java.util.List[K]) = {
+    metrics.deletes.inc(keys.size)
+    store.deleteAll(keys)
+    val keysIterator = keys.iterator
+    while (keysIterator.hasNext) {
+      collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, 
keysIterator.next, null))
+    }
+  }
+
   def flush {
     trace("Flushing.")
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 4f48cf4..3de257c 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -24,40 +24,53 @@ import org.apache.samza.util.Util.notNull
 import scala.collection.JavaConversions._
 
 object NullSafeKeyValueStore {
-  val KEY_ERROR_MSG = "Null is not a valid key."
-  val VAL_ERROR_MSG = "Null is not a valid value."
+  val NullKeyErrorMessage = "Null is not a valid key."
+  val NullKeysErrorMessage = "Null is not a valid keys list."
+  val NullValueErrorMessage = "Null is not a valid value."
 }
 
 class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends 
KeyValueStore[K, V] {
   import NullSafeKeyValueStore._
 
   def get(key: K): V = {
-    notNull(key, KEY_ERROR_MSG)
+    notNull(key, NullKeyErrorMessage)
     store.get(key)
   }
 
+  def getAll(keys: java.util.List[K]): java.util.Map[K, V] = {
+    notNull(keys, NullKeysErrorMessage)
+    keys.foreach(key => notNull(key, NullKeyErrorMessage))
+    store.getAll(keys)
+  }
+
   def put(key: K, value: V) {
-    notNull(key, KEY_ERROR_MSG)
-    notNull(value, VAL_ERROR_MSG)
+    notNull(key, NullKeyErrorMessage)
+    notNull(value, NullValueErrorMessage)
     store.put(key, value)
   }
 
   def putAll(entries: java.util.List[Entry[K, V]]) {
     entries.foreach(entry => {
-      notNull(entry.getKey, KEY_ERROR_MSG)
-      notNull(entry.getValue, VAL_ERROR_MSG)
+      notNull(entry.getKey, NullKeyErrorMessage)
+      notNull(entry.getValue, NullValueErrorMessage)
     })
     store.putAll(entries)
   }
 
   def delete(key: K) {
-    notNull(key, KEY_ERROR_MSG)
+    notNull(key, NullKeyErrorMessage)
     store.delete(key)
   }
 
+  def deleteAll(keys: java.util.List[K]) = {
+    notNull(keys, NullKeysErrorMessage)
+    keys.foreach(key => notNull(key, NullKeyErrorMessage))
+    store.deleteAll(keys)
+  }
+
   def range(from: K, to: K): KeyValueIterator[K, V] = {
-    notNull(from, KEY_ERROR_MSG)
-    notNull(to, KEY_ERROR_MSG)
+    notNull(from, NullKeyErrorMessage)
+    notNull(to, NullKeyErrorMessage)
     store.range(from, to)
   }
 
@@ -72,4 +85,4 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) 
extends KeyValueSt
   def close {
     store.close
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 531e8be..8e183ef 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -38,6 +38,22 @@ class SerializedKeyValueStore[K, V](
     fromBytesOrNull(found, msgSerde)
   }
 
+  def getAll(keys: java.util.List[K]): java.util.Map[K, V] = {
+    metrics.gets.inc(keys.size)
+    val mapBytes = store.getAll(serializeKeys(keys))
+    if (mapBytes != null) {
+      val map = new java.util.HashMap[K, V](mapBytes.size)
+      val entryIterator = mapBytes.entrySet.iterator
+      while (entryIterator.hasNext) {
+        val entry = entryIterator.next
+        map.put(fromBytesOrNull(entry.getKey, keySerde), 
fromBytesOrNull(entry.getValue, msgSerde))
+      }
+      map
+    } else {
+      null.asInstanceOf[java.util.Map[K, V]]
+    }
+  }
+
   def put(key: K, value: V) {
     metrics.puts.inc
     val keyBytes = toBytesOrNull(key, keySerde)
@@ -64,6 +80,11 @@ class SerializedKeyValueStore[K, V](
     store.delete(keyBytes)
   }
 
+  def deleteAll(keys: java.util.List[K]) = {
+    metrics.deletes.inc(keys.size)
+    store.deleteAll(serializeKeys(keys))
+  }
+
   def range(from: K, to: K): KeyValueIterator[K, V] = {
     metrics.ranges.inc
     val fromBytes = toBytesOrNull(from, keySerde)
@@ -102,7 +123,7 @@ class SerializedKeyValueStore[K, V](
     store.close
   }
 
-  def toBytesOrNull[T](t: T, serde: Serde[T]): Array[Byte] = if (t == null) {
+  private def toBytesOrNull[T](t: T, serde: Serde[T]): Array[Byte] = if (t == 
null) {
     null
   } else {
     val bytes = serde.toBytes(t)
@@ -110,11 +131,20 @@ class SerializedKeyValueStore[K, V](
     bytes
   }
 
-  def fromBytesOrNull[T](bytes: Array[Byte], serde: Serde[T]): T = if (bytes 
== null) {
+  private def fromBytesOrNull[T](bytes: Array[Byte], serde: Serde[T]): T = if 
(bytes == null) {
     null.asInstanceOf[T]
   } else {
     val obj = serde.fromBytes(bytes)
     metrics.bytesDeserialized.inc(bytes.size)
     obj
   }
+
+  private def serializeKeys(keys: java.util.List[K]): 
java.util.List[Array[Byte]] = {
+    val bytes = new java.util.ArrayList[Array[Byte]](keys.size)
+    val keysIterator = keys.iterator
+    while (keysIterator.hasNext) {
+      bytes.add(toBytesOrNull(keysIterator.next, keySerde))
+    }
+    bytes
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-test/src/main/config/perf/kv-perf.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/perf/kv-perf.properties 
b/samza-test/src/main/config/perf/kv-perf.properties
index 33fcd8d..7339052 100644
--- a/samza-test/src/main/config/perf/kv-perf.properties
+++ b/samza-test/src/main/config/perf/kv-perf.properties
@@ -37,5 +37,19 @@ test.rocksdb-write-performance.set-2.message.count=1000000
 test.rocksdb-write-performance.set-3.message.size=1024
 test.rocksdb-write-performance.set-3.message.count=1000000
 
+# Config for get-all-vs-get
+test.get-all-vs-get-write-many-read-many.stores.test-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+# Disable caching
+test.get-all-vs-get-write-many-read-many.stores.test-store.object.cache.size=0
+test.get-all-vs-get-write-many-read-many.partition.count=4
+test.get-all-vs-get-write-many-read-many.set.count=1
+
+# Config for get-all-vs-get-write-once-read-many
+test.get-all-vs-get-write-once-read-many.stores.test-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+# Disable caching
+test.get-all-vs-get-write-once-read-many.stores.test-store.object.cache.size=0
+test.get-all-vs-get-write-once-read-many.partition.count=4
+test.get-all-vs-get-write-once-read-many.set.count=3
+
 # List of tests to execute
-test.methods=rocksdb-write-performance
+test.methods=rocksdb-write-performance,get-all-vs-get-write-many-read-many,get-all-vs-get-write-once-read-many

http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 0858b98..1ce7d25 100644
--- 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -19,25 +19,26 @@
 
 package org.apache.samza.test.performance
 
-import org.apache.samza.util.Logging
+import java.io.File
+import java.util
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import com.google.common.base.Stopwatch
 import org.apache.samza.config.Config
 import org.apache.samza.config.StorageConfig._
-import org.apache.samza.container.{TaskName, SamzaContainerContext}
+import org.apache.samza.container.{SamzaContainerContext, TaskName}
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.storage.kv.KeyValueStore
-import org.apache.samza.storage.kv.KeyValueStorageEngine
+import org.apache.samza.serializers.{ByteSerde, SerdeManager, UUIDSerde}
 import org.apache.samza.storage.StorageEngineFactory
-import org.apache.samza.util.CommandLine
-import org.apache.samza.util.Util
-import org.apache.samza.serializers.{StringSerde, ByteSerde, SerdeManager}
-import org.apache.samza.Partition
-import org.apache.samza.SamzaException
+import org.apache.samza.storage.kv.{KeyValueStorageEngine, KeyValueStore}
+import org.apache.samza.system.{SystemProducer, SystemProducers}
 import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.system.SystemProducers
-import org.apache.samza.system.SystemProducer
-import java.io.File
-import java.util.UUID
-import java.util
+import org.apache.samza.util.{CommandLine, Logging, Util}
+import org.apache.samza.{Partition, SamzaException}
+
+import scala.collection.JavaConversions._
+import scala.util.Random
 
 /**
  * A simple CLI-based tool for running various key-value performance tests.
@@ -60,14 +61,15 @@ object TestKeyValuePerformance extends Logging {
 
   val testMethods: Map[String, (KeyValueStorageEngine[Array[Byte], 
Array[Byte]], Config) => Unit] = Map(
     "all-with-deletes" -> runTestAllWithDeletes,
-    "rocksdb-write-performance" -> runTestMsgWritePerformance
-  )
+    "rocksdb-write-performance" -> runTestMsgWritePerformance,
+    "get-all-vs-get-write-many-read-many" -> 
runTestGetAllVsGetWriteManyReadMany,
+    "get-all-vs-get-write-once-read-many" -> 
runTestGetAllVsGetWriteOnceReadMany)
 
   def main(args: Array[String]) {
     val cmdline = new CommandLine
     val options = cmdline.parser.parse(args: _*)
     val config = cmdline.loadConfig(options)
-    val tests = config.get("test.methods", 
"rocksdb-write-performance,all-with-deletes").split(",")
+    val tests = config.get("test.methods").split(",")
 
     tests.foreach{ test =>
       info("Running test: %s" format test)
@@ -81,7 +83,7 @@ object TestKeyValuePerformance extends Logging {
   }
 
   def invokeTest(testName: String, testMethod: 
(KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit, config: 
Config) {
-    val taskNames = new util.ArrayList[TaskName]()
+    val taskNames = new java.util.ArrayList[TaskName]()
     val partitionCount = config.getInt("partition.count", 1)
     (0 until partitionCount).map(p => taskNames.add(new TaskName(new 
Partition(p).toString)))
 
@@ -139,7 +141,6 @@ object TestKeyValuePerformance extends Logging {
 
     info("Using (num loops, messages per batch, message size in bytes) => (%s, 
%s, %s)" format (numLoops, messagesPerBatch, messageSizeBytes))
     new TestKeyValuePerformance().testAllWithDeletes(db, numLoops, 
messagesPerBatch, messageSizeBytes)
-
   }
 
   def runTestMsgWritePerformance(db: KeyValueStore[Array[Byte], Array[Byte]], 
config: Config) {
@@ -150,6 +151,13 @@ object TestKeyValuePerformance extends Logging {
     new TestKeyValuePerformance().testMsgWritePerformance(db, messageCount, 
messageSizeBytes)
   }
 
+  def runTestGetAllVsGetWriteManyReadMany(db: KeyValueStore[Array[Byte], 
Array[Byte]], config: Config) {
+    new TestKeyValuePerformance().testGetAllVsGetWriteManyReadMany(db, config)
+  }
+
+  def runTestGetAllVsGetWriteOnceReadMany(db: KeyValueStore[Array[Byte], 
Array[Byte]], config: Config) {
+    new TestKeyValuePerformance().testGetAllVsGetWriteOnceReadMany(db, config)
+  }
 }
 
 class TestKeyValuePerformance extends Logging {
@@ -200,7 +208,6 @@ class TestKeyValuePerformance extends Logging {
     info("Total time: %ss" format ((System.currentTimeMillis - start) * .001))
   }
 
-
   /**
    * Test that successively writes a set of fixed-size messages to the KV store
    * and computes the total time for the operations
@@ -222,4 +229,107 @@ class TestKeyValuePerformance extends Logging {
     val timeTaken = System.currentTimeMillis - start
     info("Total time to write %d msgs of size %d bytes : %s s" format 
(numMsgs, msgSizeInBytes, timeTaken * .001))
   }
-}
\ No newline at end of file
+
+  /**
+   * Test that ::getAll performance is better than that of ::get (test when 
there are many writes and many reads).
+   * @param store key-value store instance that is being tested
+   * @param config the test case's config
+   */
+  def testGetAllVsGetWriteManyReadMany(store: 
KeyValueStore[Array[Byte],Array[Byte]], config: Config): Unit = {
+    val iterationsCount = config.getInt("iterations.count", 100)
+    val maxMessagesCountPerBatch = 
config.getInt("message.max-count-per-batch", 100000)
+    val maxMessageSizeBytes = config.getInt("message.max-size.bytes", 1024)
+    val timer = Stopwatch.createUnstarted
+    val uuidSerde = new UUIDSerde
+
+    info("iterations count: " + iterationsCount)
+    info("max messages count per batch: " + maxMessagesCountPerBatch)
+    info("max message size in bytes: " + maxMessageSizeBytes)
+    info("%12s%12s%12s%12s".format("Msg Count", "Bytes/Msg", "get ms", "getAll 
ms"))
+
+    try {
+      (0 until iterationsCount).foreach(i => {
+        val messageSizeBytes = Random.nextInt(maxMessageSizeBytes)
+        val messagesCountPerBatch = Random.nextInt(maxMessagesCountPerBatch)
+        val keys = (0 until messagesCountPerBatch).map(k => 
uuidSerde.toBytes(UUID.randomUUID)).toList
+        val shuffledKeys = Random.shuffle(keys) // to reduce locality of 
reference -- sequential access may be unfair
+
+        keys.foreach(k => store.put(k, 
Random.nextString(messageSizeBytes).getBytes(Encoding)))
+        store.flush()
+
+        timer.reset().start()
+        assert(store.getAll(shuffledKeys).size == shuffledKeys.size)
+        val getAllTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
+
+        // Restore cache, in case it's enabled, to a state similar to the one 
above when the getAll test started
+        keys.foreach(k => store.put(k, 
Random.nextString(messageSizeBytes).getBytes(Encoding)))
+        store.flush()
+
+        timer.reset().start()
+        shuffledKeys.foreach(store.get)
+        val getTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
+
+        info("%12d%12d%12d%12d".format(messagesCountPerBatch, 
messageSizeBytes, getTime, getAllTime))
+        if (getAllTime > getTime) {
+          error("getAll was slower than get!")
+        }
+      })
+    } finally {
+      store.close()
+    }
+  }
+
+  /**
+   * Test that ::getAll performance is better than that of ::get (test when 
data are written once and read many times);
+   * load is usually greater than the storage engine's cache size (not to be 
confused with Samza's cache layer),
+   * and keys are randomly selected from the stored entries to perform a fair 
comparison of ::get vs. ::getAll (in case
+   * the underlying storage engine caches data in blocks and ::getAll causes a 
block to be loaded into the cache --
+   * one can argue that ::get should trigger the same behavior, but it's worth 
testing this WORM scenario regardless)
+   * @param store key-value store instance that is being tested
+   * @param config the test case's config
+   */
+  def testGetAllVsGetWriteOnceReadMany(store: 
KeyValueStore[Array[Byte],Array[Byte]], config: Config): Unit = {
+    val iterationsCount = config.getInt("iterations.count", 100)
+    val maxMessagesCountPerBatch = 
config.getInt("message.max-count-per-batch", 10000 + Random.nextInt(20000))
+    val maxMessageSizeBytes = config.getInt("message.max-size.bytes", 1024)
+    val totalMessagesCount = iterationsCount * maxMessagesCountPerBatch
+    val timer = Stopwatch.createUnstarted
+    val uuidSerde = new UUIDSerde
+
+    info("write once -- putting %d messages in 
store".format(totalMessagesCount))
+    val keys = (0 until totalMessagesCount).map(k => 
uuidSerde.toBytes(UUID.randomUUID)).toList
+    keys.foreach(k => store.put(k, 
Random.nextString(Random.nextInt(maxMessageSizeBytes)).getBytes(Encoding)))
+    store.flush()
+
+    info("iterations count: " + iterationsCount)
+    info("max messages count per batch: " + maxMessagesCountPerBatch)
+    info("max message size in bytes: " + maxMessageSizeBytes)
+    info("%12s%12s%12s%12s".format("Msg Count", "Total Size", "get ms", 
"getAll ms"))
+
+    try {
+      (0 until iterationsCount).foreach(i => {
+        val messagesCountPerBatch = Random.nextInt(maxMessagesCountPerBatch)
+        val shuffledKeys = Random.shuffle(keys).take(messagesCountPerBatch)
+
+        // We want to measure ::getAll when called many times, so populate the 
cache because first call is a cache-miss
+        val totalSize = store.getAll(shuffledKeys).values.map(_.length).sum
+        timer.reset().start()
+        assert(store.getAll(shuffledKeys).size == shuffledKeys.size)
+        val getAllTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
+
+        // We want to measure ::get when called many times, so populate the 
cache because first call is a cache-miss
+        shuffledKeys.foreach(store.get)
+        timer.reset().start()
+        shuffledKeys.foreach(store.get)
+        val getTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
+
+        info("%12d%12d%12d%12d".format(messagesCountPerBatch, totalSize, 
getTime, getAllTime))
+        if (getAllTime > getTime) {
+          error("getAll was slower than get!")
+        }
+      })
+    } finally {
+      store.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9f30ef10/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
 
b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index 50dfc10..9dee7be 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -97,28 +97,48 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
   }
 
   @Test
-  def getNonExistantIsNull() {
+  def getNonExistentIsNull() {
     assertNull(store.get(b("hello")))
   }
 
   @Test
-  def putAndGet() {
-    store.put(b("k"), b("v"))
-    assertArrayEquals(b("v"), store.get(b("k")))
+  def testGetAllWhenZeroMatch() {
+    store.put(b("hello"), b("world"))
+    val keys = List(b("foo"), b("bar"))
+    val actual = store.getAll(keys)
+    keys.foreach(k => assertNull("Key: " + k, actual.get(k)))
   }
 
   @Test
-  def putStessTest() {
-    for( a <- 0 to 1900000){
-        store.put(b(a+"k"), b("v"))
-      }
+  def testGetAllWhenFullMatch() {
+    val expected = Map(b("k0") -> b("v0"), b("k1") -> b("v1"))
+    expected.foreach(e => store.put(e._1, e._2))
+    val actual = store.getAll(expected.keys.toList)
+    assertEquals("Size", expected.size, actual.size)
+    expected.foreach(e => assertArrayEquals("Value at: " + s(e._1), e._2, 
actual.get(e._1)))
+  }
+
+  @Test
+  def testGetAllWhenPartialMatch() {
+    val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1"), b("k2") -> b("v2"))
+    val found = all.entrySet.head
+    val notFound = all.entrySet.last
+    store.put(found.getKey, found.getValue)
+    val actual = store.getAll(List(notFound.getKey, found.getKey))
+    assertNull(actual.get(notFound.getKey))
+    assertArrayEquals(found.getValue, actual.get(found.getKey))
+  }
+
+  @Test
+  def putAndGet() {
+    store.put(b("k"), b("v"))
+    assertArrayEquals(b("v"), store.get(b("k")))
   }
 
   @Test
   def doublePutAndGet() {
     val k = b("k2")
     store.put(k, b("v1"))
-    store.put(k, b("v2"))
     store.put(k, b("v3"))
     assertArrayEquals(b("v3"), store.get(k))
   }
@@ -127,11 +147,13 @@ class TestKeyValueStores(typeOfStore: String, 
storeConfig: String) {
   def testNullsWithSerde() {
     if (serde) {
       val a = b("a")
-      val keyMsg = Some(NullSafeKeyValueStore.KEY_ERROR_MSG)
-      val valMsg = Some(NullSafeKeyValueStore.VAL_ERROR_MSG)
 
       intercept[NullPointerException] { store.get(null) }
+      intercept[NullPointerException] { store.getAll(null) }
+      intercept[NullPointerException] { store.getAll(List(a, null)) }
       intercept[NullPointerException] { store.delete(null) }
+      intercept[NullPointerException] { store.deleteAll(null) }
+      intercept[NullPointerException] { store.deleteAll(List(a, null)) }
       intercept[NullPointerException] { store.put(null, a) }
       intercept[NullPointerException] { store.put(a, null) }
       intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new 
Entry[Array[Byte], Array[Byte]](a, null))) }
@@ -190,6 +212,41 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
   }
 
   @Test
+  def testDeleteAllWhenZeroMatch() {
+    val foo = b("foo")
+    store.put(foo, foo)
+    store.deleteAll(List(b("bar")))
+    assertArrayEquals(foo, store.get(foo))
+  }
+
+  @Test
+  def testDeleteAllWhenFullMatch() {
+    val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1"))
+    all.foreach(e => store.put(e._1, e._2))
+    assertEquals(all.size, store.getAll(all.keys.toList).size)
+    store.deleteAll(all.keys.toList)
+    all.keys.foreach(key => assertNull("Value at: " + s(key), store.get(key)))
+  }
+
+  @Test
+  def testDeleteAllWhenPartialMatch() {
+    val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1"))
+    val found = all.entrySet.head
+    val leftAlone = all.entrySet.last
+    all.foreach(e => store.put(e._1, e._2))
+    assertArrayEquals(found.getValue, store.get(found.getKey))
+    store.deleteAll(List(b("not found"), found.getKey))
+    store.flush()
+    val allIterator = store.all
+    try {
+      assertEquals(1, allIterator.size)
+      assertArrayEquals(leftAlone.getValue, store.get(leftAlone.getKey))
+    } finally {
+      allIterator.close()
+    }
+  }
+
+  @Test
   def testSimpleScenario() {
     val vals = letters.map(b(_))
     for (v <- vals) {
@@ -342,8 +399,8 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
 }
 
 object TestKeyValueStores {
-  val CacheSize = 1000000
-  val BatchSize = 1000000
+  val CacheSize = 1024
+  val BatchSize = 1024
   @Parameters
   def parameters: java.util.Collection[Array[String]] = Arrays.asList(
       //Inmemory

Reply via email to