Repository: kafka Updated Branches: refs/heads/trunk 17668e81c -> 1ef7b494b
KAFKA-3753: Add approximateNumEntries() method to KeyValueStore interface See https://issues.apache.org/jira/browse/KAFKA-3753 This contribution is my original work and I license the work to the project under the project's open source license. cc guozhangwang kichristensen ijuma Author: Jeff Klukas <[email protected]> Reviewers: Ismael Juma, Guozhang Wang Closes #1486 from jklukas/kvstore-size Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ef7b494 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ef7b494 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ef7b494 Branch: refs/heads/trunk Commit: 1ef7b494bbd937205ecd14dd30e625c2efdb3aa9 Parents: 17668e8 Author: Jeff Klukas <[email protected]> Authored: Wed Jun 15 17:08:33 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Jun 15 17:08:33 2016 -0700 ---------------------------------------------------------------------- .../kafka/streams/state/KeyValueStore.java | 10 ++++++ .../internals/InMemoryKeyValueLoggedStore.java | 5 +++ .../InMemoryKeyValueStoreSupplier.java | 5 +++ .../streams/state/internals/MemoryLRUCache.java | 5 +++ .../state/internals/MeteredKeyValueStore.java | 5 +++ .../streams/state/internals/RocksDBStore.java | 37 ++++++++++++++++++++ .../internals/AbstractKeyValueStoreTest.java | 19 ++++++++++ 7 files changed, 86 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java index 908e116..1ee790d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java @@ -97,4 +97,14 @@ public interface KeyValueStore<K, V> extends StateStore { */ KeyValueIterator<K, V> all(); + /** + * Return an approximate count of key-value mappings in this store. + * + * The count is not guaranteed to be exact in order to accommodate stores + * where an exact count is expensive to calculate. + * + * @return an approximate count of key-value mappings in the store. + */ + long approximateNumEntries(); + } http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java index efcdac7..e13bba3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java @@ -150,6 +150,11 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { } @Override + public long approximateNumEntries() { + return this.inner.approximateNumEntries(); + } + + @Override public void close() { inner.close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java index a25153c..3b632cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java @@ -147,6 +147,11 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { } @Override + public long approximateNumEntries() { + return this.map.size(); + } + + @Override public void flush() { // do-nothing since it is in-memory } http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index d410e02..0697eda 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -154,6 +154,11 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { } @Override + public long approximateNumEntries() { + return this.map.size(); + } + + @Override public void flush() { // do-nothing since it is in-memory } http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 5e5b54a..c6e93cb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -154,6 +154,11 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { } @Override + public long approximateNumEntries() { + return this.inner.approximateNumEntries(); + } + + @Override public void close() { inner.close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index a00de19..8634d68 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -354,6 +354,43 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { return new RocksDbIterator<>(innerIter, serdes); } + /** + * Return an approximate count of key-value mappings in this store. + * + * <code>RocksDB</code> cannot return an exact entry count without doing a + * full scan, so this method relies on the <code>rocksdb.estimate-num-keys</code> + * property to get an approximate count. The returned size also includes + * a count of dirty keys in the store's in-memory cache, which may lead to some + * double-counting of entries and inflate the estimate. + * + * @return an approximate count of key-value mappings in the store. + */ + @Override + public long approximateNumEntries() { + long value; + try { + value = this.db.getLongProperty("rocksdb.estimate-num-keys"); + } catch (RocksDBException e) { + throw new ProcessorStateException("Error fetching property from store " + this.name, e); + } + if (isOverflowing(value)) { + return Long.MAX_VALUE; + } + if (this.cacheDirtyKeys != null) { + value += this.cacheDirtyKeys.size(); + } + if (isOverflowing(value)) { + return Long.MAX_VALUE; + } + return value; + } + + private boolean isOverflowing(long value) { + // RocksDB returns an unsigned 8-byte integer, which could overflow long + // and manifest as a negative value. + return value < 0; + } + private void flushCache() { // flush of the cache entries if necessary if (cache != null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 2bfe644..8a22d37 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -227,4 +227,23 @@ public abstract class AbstractKeyValueStoreTest { store.close(); } } + + @Test + public void testSize() { + // Create the test driver ... + KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); + KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true); + try { + assertEquals("A newly created store should have no entries", 0, store.approximateNumEntries()); + + store.put(0, "zero"); + store.put(1, "one"); + store.put(2, "two"); + store.put(4, "four"); + store.put(5, "five"); + assertEquals(5, store.approximateNumEntries()); + } finally { + store.close(); + } + } }
