Repository: kafka Updated Branches: refs/heads/trunk efeaf1298 -> 21d7e6f19
KAFKA-4516: When a CachingStateStore is closed it should clear its associated NamedCache Clear and remove the NamedCache from the ThreadCache when a CachingKeyValueStore or CachingWindowStore is closed. Validate that the store is open when doing any queries or writes to Caching State Stores. Author: Damian Guy <[email protected]> Reviewers: Eno Thereska, Guozhang Wang Closes #2235 from dguy/kafka-4516 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/21d7e6f1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/21d7e6f1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/21d7e6f1 Branch: refs/heads/trunk Commit: 21d7e6f19bd36a7ad16291294fc933f9abfac9b7 Parents: efeaf12 Author: Damian Guy <[email protected]> Authored: Mon Dec 12 18:00:00 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Mon Dec 12 18:00:00 2016 -0800 ---------------------------------------------------------------------- .../state/internals/CachingKeyValueStore.java | 15 +++++ .../state/internals/CachingWindowStore.java | 10 ++++ .../streams/state/internals/NamedCache.java | 8 +++ .../streams/state/internals/ThreadCache.java | 9 +++ .../internals/CachingKeyValueStoreTest.java | 62 +++++++++++++++++++- .../state/internals/CachingWindowStoreTest.java | 22 +++++++ .../state/internals/ThreadCacheTest.java | 11 ++++ 7 files changed, 135 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/21d7e6f1/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index ab050b6..cfe6bd3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -115,6 +116,7 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor public void close() { flush(); underlying.close(); + cache.close(name); } @Override @@ -129,10 +131,17 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor @Override public synchronized V get(final K key) { + validateStoreOpen(); final byte[] rawKey = serdes.rawKey(key); return get(rawKey); } + private void validateStoreOpen() { + if (!isOpen()) { + throw new InvalidStateStoreException("Store " + this.name + " is currently closed"); + } + } + private V get(final byte[] rawKey) { final LRUCacheEntry entry = cache.get(name, rawKey); if (entry == null) { @@ -157,6 +166,7 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor @Override public KeyValueIterator<K, V> range(final K from, final K to) { + validateStoreOpen(); final byte[] origFrom = serdes.rawKey(from); final byte[] origTo = serdes.rawKey(to); final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(underlying.range(Bytes.wrap(origFrom), Bytes.wrap(origTo))); @@ -166,6 +176,7 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor @Override public KeyValueIterator<K, V> all() { + validateStoreOpen(); final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(underlying.all()); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(name); return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes); @@ -173,11 +184,13 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor @Override public synchronized long approximateNumEntries() { + validateStoreOpen(); return underlying.approximateNumEntries(); } @Override public synchronized void put(final K key, final V value) { + validateStoreOpen(); put(serdes.rawKey(key), value); } @@ -189,6 +202,7 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor @Override public synchronized V putIfAbsent(final K key, final V value) { + validateStoreOpen(); final byte[] rawKey = serdes.rawKey(key); final V v = get(rawKey); if (v == null) { @@ -206,6 +220,7 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor @Override public synchronized V delete(final K key) { + validateStoreOpen(); final byte[] rawKey = serdes.rawKey(key); final V v = get(rawKey); put(rawKey, null); http://git-wip-us.apache.org/repos/asf/kafka/blob/21d7e6f1/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 304a206..71856fa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.kstream.internals.TimeWindow; @@ -123,6 +124,7 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Wi public void close() { flush(); underlying.close(); + cache.close(name); } @Override @@ -143,6 +145,7 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Wi @Override public synchronized void put(final K key, final V value, final long timestamp) { + validateStoreOpen(); final byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, 0, serdes); final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(), timestamp, context.partition(), context.topic()); @@ -151,6 +154,7 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Wi @Override public synchronized WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) { + validateStoreOpen(); byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes); byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes); @@ -167,4 +171,10 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>, CachedStateStore<Wi return serdes.valueFrom(iterator.next().value); } } + + private void validateStoreOpen() { + if (!isOpen()) { + throw new InvalidStateStoreException("Store " + this.name + " is currently closed"); + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/21d7e6f1/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index 07968a9..4272f2b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -296,6 +296,14 @@ class NamedCache { return dirtyKeys.size(); } + synchronized void close() { + head = tail = null; + listener = null; + currentSizeBytes = 0; + dirtyKeys.clear(); + cache.clear(); + } + /** * A simple wrapper class to implement a doubly-linked list around MemoryLRUCacheBytesEntry */ http://git-wip-us.apache.org/repos/asf/kafka/blob/21d7e6f1/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 3d9d0b8..c6c3030 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -49,6 +49,8 @@ public class ThreadCache { private long numEvicts = 0; private long numFlushes = 0; + + public interface DirtyEntryFlushListener { void apply(final List<DirtyEntry> dirty); } @@ -191,6 +193,13 @@ public class ThreadCache { return sizeInBytes; } + synchronized void close(final String namespace) { + final NamedCache removed = caches.remove(namespace); + if (removed != null) { + removed.close(); + } + } + private void maybeEvict(final String namespace) { while (sizeBytes() > maxCacheSizeBytes) { final NamedCache cache = getOrCreateCache(namespace); http://git-wip-us.apache.org/repos/asf/kafka/blob/21d7e6f1/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index 23f8a6a..60eed96 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -19,6 +19,8 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; @@ -30,6 +32,7 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,10 +45,10 @@ import static org.junit.Assert.assertNull; public class CachingKeyValueStoreTest { + private final int maxCacheSizeBytes = 150; private CachingKeyValueStore<String, String> store; private InMemoryKeyValueStore<Bytes, byte[]> underlyingStore; private ThreadCache cache; - private int maxCacheSizeBytes; private CacheFlushListenerStub<String> cacheFlushListener; private String topic; @@ -56,7 +59,6 @@ public class CachingKeyValueStoreTest { cacheFlushListener = new CacheFlushListenerStub<>(); store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String()); store.setFlushListener(cacheFlushListener); - maxCacheSizeBytes = 150; cache = new ThreadCache(maxCacheSizeBytes); final MockProcessorContext context = new MockProcessorContext(null, null, null, null, (RecordCollector) null, cache); topic = "topic"; @@ -149,6 +151,62 @@ public class CachingKeyValueStoreTest { assertFalse(store.all().hasNext()); } + @Test + public void shouldClearNamespaceCacheOnClose() throws Exception { + store.put("a", "a"); + assertEquals(1, cache.size()); + store.close(); + assertEquals(0, cache.size()); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToGetFromClosedCachingStore() throws Exception { + store.close(); + store.get("a"); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToWriteToClosedCachingStore() throws Exception { + store.close(); + store.put("a", "a"); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToDoRangeQueryOnClosedCachingStore() throws Exception { + store.close(); + store.range("a", "b"); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToDoAllQueryOnClosedCachingStore() throws Exception { + store.close(); + store.all(); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToDoGetApproxSizeOnClosedCachingStore() throws Exception { + store.close(); + store.approximateNumEntries(); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToDoPutAllClosedCachingStore() throws Exception { + store.close(); + store.putAll(Collections.singletonList(KeyValue.pair("a", "a"))); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToDoPutIfAbsentClosedCachingStore() throws Exception { + store.close(); + store.putIfAbsent("b", "c"); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToDeleteFromClosedCachingStore() throws Exception { + store.close(); + store.delete("key"); + } + private int addItemsToCache() throws IOException { int cachedSize = 0; int i = 0; http://git-wip-us.apache.org/repos/asf/kafka/blob/21d7e6f1/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 49e2db3..023fea6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; @@ -150,6 +151,27 @@ public class CachingWindowStoreTest { assertFalse(fetch.hasNext()); } + @Test + public void shouldClearNamespaceCacheOnClose() throws Exception { + cachingStore.put("a", "a"); + assertEquals(1, cache.size()); + cachingStore.close(); + assertEquals(0, cache.size()); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToFetchFromClosedCachingStore() throws Exception { + cachingStore.close(); + cachingStore.fetch("a", 0, 10); + } + + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowIfTryingToWriteToClosedCachingStore() throws Exception { + cachingStore.close(); + cachingStore.put("a", "a"); + } + + private int addItemsToCache() throws IOException { int cachedSize = 0; int i = 0; http://git-wip-us.apache.org/repos/asf/kafka/blob/21d7e6f1/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java index 1049b91..6c446ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java @@ -493,6 +493,17 @@ public class ThreadCacheTest { threadCache.put("name", new byte[]{2}, dirtyEntry(new byte[remaining + 100])); } + @Test + public void shouldCleanupNamedCacheOnClose() throws Exception { + final ThreadCache cache = new ThreadCache(100000); + cache.put("one", new byte[]{1}, cleanEntry(new byte[] {1})); + cache.put("two", new byte[]{1}, cleanEntry(new byte[] {1})); + assertEquals(cache.size(), 2); + cache.close("two"); + assertEquals(cache.size(), 1); + assertNull(cache.get("two", new byte[] {1})); + } + private LRUCacheEntry dirtyEntry(final byte[] key) { return new LRUCacheEntry(key, true, -1, -1, -1, ""); }
