Repository: samza Updated Branches: refs/heads/master 727a3c19a -> 89beb1fcc
SAMZA-1705: Switch to use snapshot in iterable impl of RocksDb We should use rocksDb.snapshot() method to keep the snapshot and creates a new iterator with it all the time. The perf shows a little bit more expensive but mostly on par with range iterator query. Author: xinyuiscool <[email protected]> Reviewers: Jagadish V <[email protected]> Closes #510 from xinyuiscool/SAMZA-1705 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/89beb1fc Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/89beb1fc Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/89beb1fc Branch: refs/heads/master Commit: 89beb1fccb01c781a4de905d57a4bd99df25577a Parents: 727a3c1 Author: xinyuiscool <[email protected]> Authored: Tue May 8 17:48:55 2018 -0700 Committer: xiliu <[email protected]> Committed: Tue May 8 17:48:55 2018 -0700 ---------------------------------------------------------------------- .../samza/storage/kv/KeyValueIterable.java | 24 --------- .../samza/storage/kv/KeyValueSnapshot.java | 42 ++++++++++++++++ .../apache/samza/storage/kv/KeyValueStore.java | 12 ++--- .../operators/util/InternalInMemoryStore.java | 6 +-- .../operators/impl/store/TestInMemoryStore.java | 10 ++-- .../kv/inmemory/InMemoryKeyValueStore.scala | 8 +-- .../kv/inmemory/TestInMemoryKeyValueStore.java | 10 ++-- .../samza/storage/kv/RocksDbKeyValueStore.scala | 27 ++++------- .../kv/TestRocksDbKeyValueStoreJava.java | 51 ++++++++++++++++++-- .../samza/storage/kv/AccessLoggedStore.scala | 6 +-- .../apache/samza/storage/kv/CachedStore.scala | 4 +- .../storage/kv/KeyValueStorageEngine.scala | 8 +-- .../kv/KeyValueStorageEngineMetrics.scala | 4 +- .../apache/samza/storage/kv/LoggedStore.scala | 4 +- .../storage/kv/NullSafeKeyValueStore.scala | 4 +- .../storage/kv/SerializedKeyValueStore.scala | 12 +++-- .../samza/storage/kv/MockKeyValueStore.scala | 2 +- 17 files changed, 148 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java deleted file mode 100644 index 8fd00ed..0000000 --- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.storage.kv; - -public interface KeyValueIterable<K, V> extends Iterable<Entry<K, V>> { - KeyValueIterator<K, V> iterator(); -} http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueSnapshot.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueSnapshot.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueSnapshot.java new file mode 100644 index 0000000..52cb7fa --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueSnapshot.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv; + +/** + * An immutable view of the {@link KeyValueStore} at a point-in-time. + * The snapshot MUST be closed after use. + * + * @param <K> key type + * @param <V> value type + */ +public interface KeyValueSnapshot<K, V> extends Iterable<Entry<K, V>> { + /** + * Creates a new iterator for this snapshot. The iterator MUST be + * closed after its execution by invoking {@link KeyValueIterator#close}. + * @return an iterator + */ + KeyValueIterator<K, V> iterator(); + + /** + * Closes this snapshot releasing any associated resources. Once a + * snapshot is closed, no new iterators can be created for it. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java index 3f216bd..67d7fb3 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java +++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java @@ -111,17 +111,17 @@ public interface KeyValueStore<K, V> { KeyValueIterator<K, V> range(K from, K to); /** - * Returns an iterable for a sorted range of entries specified by [{@code from}, {@code to}). - * Note that we snapshot the iterator when the iterable is created from this function, and - * the iteration results is guaranteed to reflect the snapshot if only one iterator is in use at a time. + * Returns a snapshot of this store for a sorted range of entries specified by [{@code from}, {@code to}). + * The snapshot is immutable - ie., any mutations to the store are not reflected in the snapshot after it is created. * + * <p><b>API Note:</b> The returned snapshot MUST be closed after use. * @param from the key specifying the low endpoint (inclusive) of the keys in the returned range. * @param to the key specifying the high endpoint (exclusive) of the keys in the returned range. - * @return an iterable for the specified key range. + * @return a snapshot for the specified key range. * @throws NullPointerException if null is used for {@code from} or {@code to}. */ - default KeyValueIterable<K, V> iterate(K from, K to) { - return () -> range(from, to); + default KeyValueSnapshot<K, V> snapshot(K from, K to) { + throw new UnsupportedOperationException("snapshot() is not supported in " + this.getClass().getName()); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java index 7360474..10a92f8 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java +++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java @@ -20,7 +20,7 @@ package org.apache.samza.operators.util; import org.apache.samza.storage.kv.Entry; -import org.apache.samza.storage.kv.KeyValueIterable; +import org.apache.samza.storage.kv.KeyValueSnapshot; import org.apache.samza.storage.kv.KeyValueIterator; import org.apache.samza.storage.kv.KeyValueStore; @@ -95,8 +95,8 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> { } @Override - public KeyValueIterable<K, V> iterate(K from, K to) { - throw new UnsupportedOperationException("iterate() is not supported in " + InternalInMemoryStore.class.getName()); + public KeyValueSnapshot<K, V> snapshot(K from, K to) { + throw new UnsupportedOperationException("snapshot() is not supported in " + InternalInMemoryStore.class.getName()); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java index e331703..9c2306a 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java @@ -21,7 +21,7 @@ package org.apache.samza.operators.impl.store; import com.google.common.primitives.UnsignedBytes; import org.apache.samza.serializers.Serde; import org.apache.samza.storage.kv.Entry; -import org.apache.samza.storage.kv.KeyValueIterable; +import org.apache.samza.storage.kv.KeyValueSnapshot; import org.apache.samza.storage.kv.KeyValueIterator; import org.apache.samza.storage.kv.KeyValueStore; @@ -100,13 +100,17 @@ public class TestInMemoryStore<K, V> implements KeyValueStore<K, V> { } @Override - public KeyValueIterable<K, V> iterate(K from, K to) { + public KeyValueSnapshot<K, V> snapshot(K from, K to) { final ConcurrentNavigableMap<byte[], byte[]> values = map.subMap(keySerde.toBytes(from), keySerde.toBytes(to)); - return new KeyValueIterable<K, V>() { + return new KeyValueSnapshot<K, V>() { @Override public KeyValueIterator<K, V> iterator() { return new InMemoryIterator<>(values.entrySet().iterator(), keySerde, valSerde); } + + @Override + public void close() { + } }; } http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala index decaee0..988d1c9 100644 --- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala +++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala @@ -113,13 +113,15 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor found } - override def iterate(from: Array[Byte], to: Array[Byte]): KeyValueIterable[Array[Byte], Array[Byte]] = { - // snapshot the iterable + override def snapshot(from: Array[Byte], to: Array[Byte]): KeyValueSnapshot[Array[Byte], Array[Byte]] = { + // snapshot the underlying map val entries = underlying.subMap(from, to).entrySet() - new KeyValueIterable[Array[Byte], Array[Byte]] { + new KeyValueSnapshot[Array[Byte], Array[Byte]] { override def iterator(): KeyValueIterator[Array[Byte], Array[Byte]] = { new InMemoryIterator(entries.iterator()) } + + override def close() { } } } } http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java index 0fa5807..7ee588c 100644 --- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java +++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java @@ -23,7 +23,7 @@ import com.google.common.collect.Iterators; import com.google.common.primitives.Ints; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.storage.kv.Entry; -import org.apache.samza.storage.kv.KeyValueIterable; +import org.apache.samza.storage.kv.KeyValueSnapshot; import org.apache.samza.storage.kv.KeyValueStoreMetrics; import org.junit.Test; @@ -40,7 +40,7 @@ import static org.junit.Assert.assertTrue; public class TestInMemoryKeyValueStore { @Test - public void testIterate() throws Exception { + public void testSnapshot() throws Exception { InMemoryKeyValueStore store = new InMemoryKeyValueStore( new KeyValueStoreMetrics("testInMemory", new MetricsRegistryMap())); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); @@ -51,13 +51,13 @@ public class TestInMemoryKeyValueStore { byte[] firstKey = genKey(outputStream, prefix, 0); byte[] lastKey = genKey(outputStream, prefix, 100); - KeyValueIterable<byte[], byte[]> iterable = store.iterate(firstKey, lastKey); + KeyValueSnapshot<byte[], byte[]> snapshot = store.snapshot(firstKey, lastKey); // Make sure the cached Iterable won't change when new elements are added store.put(genKey(outputStream, prefix, 200), genValue()); - assertTrue(Iterators.size(iterable.iterator()) == 100); + assertTrue(Iterators.size(snapshot.iterator()) == 100); List<Integer> keys = new ArrayList<>(); - for (Entry<byte[], byte[]> entry : iterable) { + for (Entry<byte[], byte[]> entry : snapshot) { int key = Ints.fromByteArray(Arrays.copyOfRange(entry.getKey(), prefix.getBytes().length, entry.getKey().length)); keys.add(key); } http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index e0ee576..6f3794d 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -204,26 +204,17 @@ class RocksDbKeyValueStore( new RocksDbIterator(iter) } - override def iterate(from: Array[Byte], to: Array[Byte]): KeyValueIterable[Array[Byte], Array[Byte]] = { - //snapshot the iterator - val snapshotIter : RocksDbRangeIterator = range(from, to).asInstanceOf[RocksDbRangeIterator] - new KeyValueIterable[Array[Byte], Array[Byte]] { - var iter:RocksDbRangeIterator = null + override def snapshot(from: Array[Byte], to: Array[Byte]): KeyValueSnapshot[Array[Byte], Array[Byte]] = { + val readOptions = new ReadOptions() + readOptions.setSnapshot(db.getSnapshot) + new KeyValueSnapshot[Array[Byte], Array[Byte]] { def iterator(): KeyValueIterator[Array[Byte], Array[Byte]] = { - this.synchronized { - if (iter == null) { - iter = snapshotIter - iter - } else if(iter.isOpen() && !iter.hasNext()) { - // use the cached iterator and reset the position to the beginning - iter.seek(from) - iter - } else { - // we need to create a new iterator since the cached one is still in use or already closed - range(from, to) - } - } + new RocksDbRangeIterator(db.newIterator(readOptions), from, to) + } + + def close() = { + db.releaseSnapshot(readOptions.snapshot()) } } } http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java index 98688c6..672beac 100644 --- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java +++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java @@ -34,6 +34,7 @@ import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -60,22 +61,64 @@ public class TestRocksDbKeyValueStoreJava { byte[] firstKey = genKey(outputStream, prefix, 0); byte[] lastKey = genKey(outputStream, prefix, 1000); - KeyValueIterable<byte[], byte[]> iterable = store.iterate(firstKey, lastKey); + KeyValueSnapshot<byte[], byte[]> snapshot = store.snapshot(firstKey, lastKey); // Make sure the cached Iterable won't change when new elements are added store.put(genKey(outputStream, prefix, 200), genValue()); - assertTrue(Iterators.size(iterable.iterator()) == 100); + assertTrue(Iterators.size(snapshot.iterator()) == 100); List<Integer> keys = new ArrayList<>(); - for (Entry<byte[], byte[]> entry : iterable) { + for (Entry<byte[], byte[]> entry : snapshot) { int key = Ints.fromByteArray(Arrays.copyOfRange(entry.getKey(), prefix.getBytes().length, entry.getKey().length)); keys.add(key); } assertEquals(keys, IntStream.rangeClosed(0, 99).boxed().collect(Collectors.toList())); outputStream.close(); + snapshot.close(); store.close(); } + @Test + public void testPerf() throws Exception { + Config config = new MapConfig(); + Options options = new Options(); + options.setCreateIfMissing(true); + + File dbDir = new File(System.getProperty("java.io.tmpdir") + "/dbStore" + System.currentTimeMillis()); + RocksDbKeyValueStore store = new RocksDbKeyValueStore(dbDir, options, config, false, "dbStore", + new WriteOptions(), new FlushOptions(), new KeyValueStoreMetrics("dbStore", new MetricsRegistryMap())); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + String prefix = "this is the key prefix"; + Random r = new Random(); + for(int i = 0; i < 100000; i++) { + store.put(genKey(outputStream, prefix, r.nextInt()), genValue()); + } + + byte[] firstKey = genKey(outputStream, prefix, 0); + byte[] lastKey = genKey(outputStream, prefix, Integer.MAX_VALUE); + + long start; + KeyValueIterator iter; + + start = System.currentTimeMillis(); + iter = store.range(firstKey, lastKey); + long rangeTime = System.currentTimeMillis() - start; + start = System.currentTimeMillis(); + Iterators.size(iter); + long rangeIterTime = System.currentTimeMillis() - start; + System.out.println("range iter create time: " + rangeTime + ", iterate time: " + rangeIterTime); + + // Please comment out range query part in order to do an accurate perf test for snapshot + start = System.currentTimeMillis(); + iter = store.snapshot(firstKey, lastKey).iterator(); + long snapshotTime = System.currentTimeMillis() - start; + start = System.currentTimeMillis(); + Iterators.size(iter); + long snapshotIterTime = System.currentTimeMillis() - start; + System.out.println("snapshot iter create time: " + snapshotTime + ", iterate time: " + snapshotIterTime); + } + private byte[] genKey(ByteArrayOutputStream outputStream, String prefix, int i) throws Exception { outputStream.reset(); outputStream.write(prefix.getBytes()); @@ -84,7 +127,7 @@ public class TestRocksDbKeyValueStoreJava { } private byte[] genValue() { - int randomVal = ThreadLocalRandom.current().nextInt(0, 100000); + int randomVal = ThreadLocalRandom.current().nextInt(); return Ints.toByteArray(randomVal); } } http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala index f6fca15..39136db 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala @@ -41,7 +41,7 @@ class AccessLoggedStore[K, V]( val WRITE = 2 val DELETE = 3 val RANGE = 4 - val ITERATE = 5 + val SNAPSHOT = 5 } val streamName = storageConfig.getAccessLogStream(changelogSystemStreamPartition.getSystemStream.getStream) @@ -92,11 +92,11 @@ class AccessLoggedStore[K, V]( store.all() } - override def iterate(from: K, to: K): KeyValueIterable[K, V] = { + override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = { val list : util.ArrayList[K] = new util.ArrayList[K]() list.add(from) list.add(to) - logAccess(DBOperation.ITERATE, serializeKeys(list), store.iterate(from, to)) + logAccess(DBOperation.SNAPSHOT, serializeKeys(list), store.snapshot(from, to)) } def close(): Unit = { http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala index 29efacb..fa8b1b2 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala @@ -287,8 +287,8 @@ class CachedStore[K, V]( def hasArrayKeys = containsArrayKeys - override def iterate(from: K, to: K): KeyValueIterable[K, V] = { - store.iterate(from, to) + override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = { + store.snapshot(from, to) } } http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index b055ca5..157c1bc 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -161,10 +161,10 @@ class KeyValueStorageEngine[K, V]( override def getStoreProperties: StoreProperties = storeProperties - override def iterate(from: K, to: K): KeyValueIterable[K, V] = { - updateTimer(metrics.iterateNs) { - metrics.iterates.inc - wrapperStore.iterate(from, to) + override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = { + updateTimer(metrics.snapshotNs) { + metrics.snapshots.inc + wrapperStore.snapshot(from, to) } } } http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala index 4162292..a2c812e 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala @@ -33,7 +33,7 @@ class KeyValueStorageEngineMetrics( val puts = newCounter("puts") val deletes = newCounter("deletes") val flushes = newCounter("flushes") - val iterates = newCounter("iterates") + val snapshots = newCounter("snapshots") val restoredMessages = newCounter("messages-restored") //Deprecated val restoredMessagesGauge = newGauge("restored-messages", 0) @@ -48,7 +48,7 @@ class KeyValueStorageEngineMetrics( val flushNs = newTimer("flush-ns") val allNs = newTimer("all-ns") val rangeNs = newTimer("range-ns") - val iterateNs = newTimer("iterate-ns") + val snapshotNs = newTimer("snapshot-ns") override def getPrefix = storeName + "-" } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala index 0d013f8..e5f4ca4 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala @@ -114,7 +114,7 @@ class LoggedStore[K, V]( store.close } - override def iterate(from: K, to: K): KeyValueIterable[K, V] = { - store.iterate(from, to) + override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = { + store.snapshot(from, to) } } http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala index 1978710..6be0575 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala @@ -90,9 +90,9 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt } } - override def iterate(from: K, to: K): KeyValueIterable[K, V] = { + override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = { notNull(from, NullKeyErrorMessage) notNull(to, NullKeyErrorMessage) - store.iterate(from, to) + store.snapshot(from, to) } } http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala index 5f59143..567e7b8 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala @@ -149,13 +149,17 @@ class SerializedKeyValueStore[K, V]( bytes } - override def iterate(from: K, to: K): KeyValueIterable[K, V] = { + override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = { val fromBytes = toBytesOrNull(from, keySerde) val toBytes = toBytesOrNull(to, keySerde) - val iterable = store.iterate(fromBytes, toBytes) - new KeyValueIterable[K, V] { + val snapshot = store.snapshot(fromBytes, toBytes) + new KeyValueSnapshot[K, V] { override def iterator(): KeyValueIterator[K, V] = { - new DeserializingIterator(iterable.iterator()) + new DeserializingIterator(snapshot.iterator()) + } + + override def close() = { + snapshot.close() } } } http://git-wip-us.apache.org/repos/asf/samza/blob/89beb1fc/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala index 8affd5e..4526641 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala @@ -70,7 +70,7 @@ class MockKeyValueStore extends KeyValueStore[String, String] { override def close() { kvMap.clear() } - override def iterate(from: String, to: String): KeyValueIterable[String, String] = { + override def snapshot(from: String, to: String): KeyValueSnapshot[String, String] = { throw new UnsupportedOperationException("iterator() not supported") } } \ No newline at end of file
