Repository: samza Updated Branches: refs/heads/master 2e461a880 -> fa49e7228
SAMZA-1691: Support get iterable from KeyValueStore Right now for KeyValueStore we have a range query to return an iterator. For usage in BEAM, we need a iterable which will 1) create the snapshot when called, and 2) create an iterator when needed. Add the iterate() function in KeyValueStore to support it. It's implemented as follows: 1) for rocksDb, it will create the iterator when it's called, which will has a snapshot of the elements. Then every time when the iterator is needed, we will seek the iterator from beginning; 2) for inMemoryDb, it will create the snapshot submap when iterate() is called. The submap is an iterable and it can return a new iterator when needed. Author: xinyuiscool <[email protected]> Reviewers: Boris S <[email protected]> Closes #492 from xinyuiscool/SAMZA-1691 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fa49e722 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fa49e722 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fa49e722 Branch: refs/heads/master Commit: fa49e7228319e845bb81336b97af8e28e04524c9 Parents: 2e461a8 Author: xinyuiscool <[email protected]> Authored: Mon May 7 09:51:28 2018 -0700 Committer: xiliu <[email protected]> Committed: Mon May 7 09:51:28 2018 -0700 ---------------------------------------------------------------------- .../samza/storage/kv/KeyValueIterable.java | 24 ++++++ .../apache/samza/storage/kv/KeyValueStore.java | 14 +++ .../operators/util/InternalInMemoryStore.java | 6 ++ .../operators/impl/store/TestInMemoryStore.java | 12 +++ .../kv/inmemory/InMemoryKeyValueStore.scala | 12 ++- .../kv/inmemory/TestInMemoryKeyValueStore.java | 81 ++++++++++++++++++ .../samza/storage/kv/RocksDbKeyValueStore.scala | 33 +++++++ .../kv/TestRocksDbKeyValueStoreJava.java | 90 ++++++++++++++++++++ .../samza/storage/kv/AccessLoggedStore.scala | 9 +- .../apache/samza/storage/kv/CachedStore.scala | 4 + .../storage/kv/KeyValueStorageEngine.scala | 7 ++ .../kv/KeyValueStorageEngineMetrics.scala | 2 + .../apache/samza/storage/kv/LoggedStore.scala | 3 + .../storage/kv/NullSafeKeyValueStore.scala | 6 ++ .../storage/kv/SerializedKeyValueStore.scala | 11 +++ .../samza/storage/kv/MockKeyValueStore.scala | 4 + 16 files changed, 316 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java new file mode 100644 index 0000000..8fd00ed --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv; + +public interface KeyValueIterable<K, V> extends Iterable<Entry<K, V>> { + KeyValueIterator<K, V> iterator(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java index 18a89ec..3f216bd 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java +++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java @@ -111,6 +111,20 @@ public interface KeyValueStore<K, V> { KeyValueIterator<K, V> range(K from, K to); /** + * Returns an iterable for a sorted range of entries specified by [{@code from}, {@code to}). + * Note that we snapshot the iterator when the iterable is created from this function, and + * the iteration results is guaranteed to reflect the snapshot if only one iterator is in use at a time. + * + * @param from the key specifying the low endpoint (inclusive) of the keys in the returned range. + * @param to the key specifying the high endpoint (exclusive) of the keys in the returned range. + * @return an iterable for the specified key range. + * @throws NullPointerException if null is used for {@code from} or {@code to}. + */ + default KeyValueIterable<K, V> iterate(K from, K to) { + return () -> range(from, to); + } + + /** * Returns an iterator for all entries in this key-value store. * * <p><b>API Note:</b> The returned iterator MUST be closed after use.</p> http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java index b8672c6..7360474 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java +++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java @@ -20,6 +20,7 @@ package org.apache.samza.operators.util; import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueIterable; import org.apache.samza.storage.kv.KeyValueIterator; import org.apache.samza.storage.kv.KeyValueStore; @@ -94,6 +95,11 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> { } @Override + public KeyValueIterable<K, V> iterate(K from, K to) { + throw new UnsupportedOperationException("iterate() is not supported in " + InternalInMemoryStore.class.getName()); + } + + @Override public KeyValueIterator<K, V> all() { final Iterator<Map.Entry<K, V>> iterator = map.entrySet().iterator(); return new KeyValueIterator<K, V>() { http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java index d16954d..e331703 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java @@ -21,6 +21,7 @@ package org.apache.samza.operators.impl.store; import com.google.common.primitives.UnsignedBytes; import org.apache.samza.serializers.Serde; import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueIterable; import org.apache.samza.storage.kv.KeyValueIterator; import org.apache.samza.storage.kv.KeyValueStore; @@ -99,6 +100,17 @@ public class TestInMemoryStore<K, V> implements KeyValueStore<K, V> { } @Override + public KeyValueIterable<K, V> iterate(K from, K to) { + final ConcurrentNavigableMap<byte[], byte[]> values = map.subMap(keySerde.toBytes(from), keySerde.toBytes(to)); + return new KeyValueIterable<K, V>() { + @Override + public KeyValueIterator<K, V> iterator() { + return new InMemoryIterator<>(values.entrySet().iterator(), keySerde, valSerde); + } + }; + } + + @Override public KeyValueIterator<K, V> all() { return new InMemoryIterator(map.entrySet().iterator(), keySerde, valSerde); } http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala index 7b83163..decaee0 100644 --- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala +++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala @@ -20,7 +20,7 @@ package org.apache.samza.storage.kv.inmemory import com.google.common.primitives.UnsignedBytes import org.apache.samza.util.Logging -import org.apache.samza.storage.kv.{KeyValueStoreMetrics, KeyValueIterator, Entry, KeyValueStore} +import org.apache.samza.storage.kv._ import java.util /** @@ -112,4 +112,14 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor } found } + + override def iterate(from: Array[Byte], to: Array[Byte]): KeyValueIterable[Array[Byte], Array[Byte]] = { + // snapshot the iterable + val entries = underlying.subMap(from, to).entrySet() + new KeyValueIterable[Array[Byte], Array[Byte]] { + override def iterator(): KeyValueIterator[Array[Byte], Array[Byte]] = { + new InMemoryIterator(entries.iterator()) + } + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java new file mode 100644 index 0000000..0fa5807 --- /dev/null +++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv.inmemory; + +import com.google.common.collect.Iterators; +import com.google.common.primitives.Ints; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueIterable; +import org.apache.samza.storage.kv.KeyValueStoreMetrics; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestInMemoryKeyValueStore { + @Test + public void testIterate() throws Exception { + InMemoryKeyValueStore store = new InMemoryKeyValueStore( + new KeyValueStoreMetrics("testInMemory", new MetricsRegistryMap())); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + String prefix = "prefix"; + for(int i = 0; i < 100; i++) { + store.put(genKey(outputStream, prefix, i), genValue()); + } + + byte[] firstKey = genKey(outputStream, prefix, 0); + byte[] lastKey = genKey(outputStream, prefix, 100); + KeyValueIterable<byte[], byte[]> iterable = store.iterate(firstKey, lastKey); + // Make sure the cached Iterable won't change when new elements are added + store.put(genKey(outputStream, prefix, 200), genValue()); + assertTrue(Iterators.size(iterable.iterator()) == 100); + + List<Integer> keys = new ArrayList<>(); + for (Entry<byte[], byte[]> entry : iterable) { + int key = Ints.fromByteArray(Arrays.copyOfRange(entry.getKey(), prefix.getBytes().length, entry.getKey().length)); + keys.add(key); + } + assertEquals(keys, IntStream.rangeClosed(0, 99).boxed().collect(Collectors.toList())); + + outputStream.close(); + store.close(); + } + + private byte[] genKey(ByteArrayOutputStream outputStream, String prefix, int i) throws Exception { + outputStream.reset(); + outputStream.write(prefix.getBytes()); + outputStream.write(Ints.toByteArray(i)); + return outputStream.toByteArray(); + } + + private byte[] genValue() { + int randomVal = ThreadLocalRandom.current().nextInt(0, 100000); + return Ints.toByteArray(randomVal); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index 856cc4e..06fb584 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -20,6 +20,7 @@ package org.apache.samza.storage.kv import java.io.File +import java.util import java.util.Comparator import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantReadWriteLock @@ -203,6 +204,30 @@ class RocksDbKeyValueStore( new RocksDbIterator(iter) } + def iterate(from: Array[Byte], to: Array[Byte]): KeyValueIterable[Array[Byte], Array[Byte]] = { + //snapshot the iterator + val snapshotIter : RocksDbRangeIterator = range(from, to).asInstanceOf[RocksDbRangeIterator] + new KeyValueIterable[Array[Byte], Array[Byte]] { + var iter:RocksDbRangeIterator = null + + def iterator(): KeyValueIterator[Array[Byte], Array[Byte]] = { + this.synchronized { + if (iter == null) { + iter = snapshotIter + iter + } else if(iter.isOpen() && !iter.hasNext()) { + // use the cached iterator and reset the position to the beginning + iter.seek(from) + iter + } else { + // we need to create a new iterator since the cached one is still in use or already closed + range(from, to) + } + } + } + } + } + def flush(): Unit = ifOpen { metrics.flushes.inc trace("Flushing store: %s" format storeName) @@ -248,6 +273,10 @@ class RocksDbKeyValueStore( iter.close() } + def isOpen() = ifOpen { + open + } + override def remove() = throw new UnsupportedOperationException("RocksDB iterator doesn't support remove") override def hasNext() = ifOpen(iter.isValid) @@ -301,6 +330,10 @@ class RocksDbKeyValueStore( override def hasNext() = ifOpen { super.hasNext() && comparator.compare(peekKey(), to) < 0 } + + def seek(key: Array[Byte]) = { + iter.seek(key) + } } /** http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java new file mode 100644 index 0000000..98688c6 --- /dev/null +++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv; + +import com.google.common.collect.Iterators; +import com.google.common.primitives.Ints; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.junit.Test; +import org.rocksdb.FlushOptions; +import org.rocksdb.Options; +import org.rocksdb.WriteOptions; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestRocksDbKeyValueStoreJava { + @Test + public void testIterate() throws Exception { + Config config = new MapConfig(); + Options options = new Options(); + options.setCreateIfMissing(true); + + File dbDir = new File(System.getProperty("java.io.tmpdir") + "/dbStore" + System.currentTimeMillis()); + RocksDbKeyValueStore store = new RocksDbKeyValueStore(dbDir, options, config, false, "dbStore", + new WriteOptions(), new FlushOptions(), new KeyValueStoreMetrics("dbStore", new MetricsRegistryMap())); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + String prefix = "prefix"; + for(int i = 0; i < 100; i++) { + store.put(genKey(outputStream, prefix, i), genValue()); + } + + byte[] firstKey = genKey(outputStream, prefix, 0); + byte[] lastKey = genKey(outputStream, prefix, 1000); + KeyValueIterable<byte[], byte[]> iterable = store.iterate(firstKey, lastKey); + // Make sure the cached Iterable won't change when new elements are added + store.put(genKey(outputStream, prefix, 200), genValue()); + assertTrue(Iterators.size(iterable.iterator()) == 100); + + List<Integer> keys = new ArrayList<>(); + for (Entry<byte[], byte[]> entry : iterable) { + int key = Ints.fromByteArray(Arrays.copyOfRange(entry.getKey(), prefix.getBytes().length, entry.getKey().length)); + keys.add(key); + } + assertEquals(keys, IntStream.rangeClosed(0, 99).boxed().collect(Collectors.toList())); + + outputStream.close(); + store.close(); + } + + private byte[] genKey(ByteArrayOutputStream outputStream, String prefix, int i) throws Exception { + outputStream.reset(); + outputStream.write(prefix.getBytes()); + outputStream.write(Ints.toByteArray(i)); + return outputStream.toByteArray(); + } + + private byte[] genValue() { + int randomVal = ThreadLocalRandom.current().nextInt(0, 100000); + return Ints.toByteArray(randomVal); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala index 879a144..67fd011 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala @@ -41,6 +41,7 @@ class AccessLoggedStore[K, V]( val WRITE = 2 val DELETE = 3 val RANGE = 4 + val ITERATE = 5 } val streamName = storageConfig.getAccessLogStream(changelogSystemStreamPartition.getSystemStream.getStream) @@ -91,6 +92,13 @@ class AccessLoggedStore[K, V]( store.all() } + def iterate(from: K, to: K): KeyValueIterable[K, V] = { + val list : util.ArrayList[K] = new util.ArrayList[K]() + list.add(from) + list.add(to) + logAccess(DBOperation.ITERATE, serializeKeys(list), store.iterate(from, to)) + } + def close(): Unit = { trace("Closing accessLogged store.") @@ -151,5 +159,4 @@ class AccessLoggedStore[K, V]( val bytes = keySerde.toBytes(key) bytes } - } http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala index d40999a..29efacb 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala @@ -286,6 +286,10 @@ class CachedStore[K, V]( } def hasArrayKeys = containsArrayKeys + + override def iterate(from: K, to: K): KeyValueIterable[K, V] = { + store.iterate(from, to) + } } private case class CacheEntry[K, V](var value: V, var dirty: mutable.DoubleLinkedList[K]) http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index 5f7bbd8..b055ca5 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -160,4 +160,11 @@ class KeyValueStorageEngine[K, V]( } override def getStoreProperties: StoreProperties = storeProperties + + override def iterate(from: K, to: K): KeyValueIterable[K, V] = { + updateTimer(metrics.iterateNs) { + metrics.iterates.inc + wrapperStore.iterate(from, to) + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala index 8c42c7c..4162292 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala @@ -33,6 +33,7 @@ class KeyValueStorageEngineMetrics( val puts = newCounter("puts") val deletes = newCounter("deletes") val flushes = newCounter("flushes") + val iterates = newCounter("iterates") val restoredMessages = newCounter("messages-restored") //Deprecated val restoredMessagesGauge = newGauge("restored-messages", 0) @@ -47,6 +48,7 @@ class KeyValueStorageEngineMetrics( val flushNs = newTimer("flush-ns") val allNs = newTimer("all-ns") val rangeNs = newTimer("range-ns") + val iterateNs = newTimer("iterate-ns") override def getPrefix = storeName + "-" } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala index e0c7a31..0d013f8 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala @@ -114,4 +114,7 @@ class LoggedStore[K, V]( store.close } + override def iterate(from: K, to: K): KeyValueIterable[K, V] = { + store.iterate(from, to) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala index 7adffa9..1978710 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala @@ -89,4 +89,10 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt throw new NullPointerException(msg) } } + + override def iterate(from: K, to: K): KeyValueIterable[K, V] = { + notNull(from, NullKeyErrorMessage) + notNull(to, NullKeyErrorMessage) + store.iterate(from, to) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala index 16dd980..5f59143 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala @@ -148,4 +148,15 @@ class SerializedKeyValueStore[K, V]( } bytes } + + override def iterate(from: K, to: K): KeyValueIterable[K, V] = { + val fromBytes = toBytesOrNull(from, keySerde) + val toBytes = toBytesOrNull(to, keySerde) + val iterable = store.iterate(fromBytes, toBytes) + new KeyValueIterable[K, V] { + override def iterator(): KeyValueIterator[K, V] = { + new DeserializingIterator(iterable.iterator()) + } + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala index f66dc04..8affd5e 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala @@ -69,4 +69,8 @@ class MockKeyValueStore extends KeyValueStore[String, String] { override def flush() {} // no-op override def close() { kvMap.clear() } + + override def iterate(from: String, to: String): KeyValueIterable[String, String] = { + throw new UnsupportedOperationException("iterator() not supported") + } } \ No newline at end of file
