Repository: kafka Updated Branches: refs/heads/trunk 56e5627da -> e9a67a8da
KAFKA-4492: Make the streams cache eviction policy tolerable to reentrant puts The NamedCache wasn't correctly dealing with its re-entrant nature. This would result in the LRU becoming corrupted, and the above exception occurring during eviction. For example: Cache A: dirty key 1 eviction runs on Cache A Node for key 1 gets marked as clean Entry for key 1 gets flushed downstream Downstream there is a processor that also refers to the table fronted by Cache A Downstream processor puts key 2 into Cache A This triggers an eviction of key 1 again ( it is still the oldest node as hasn't been removed from the LRU) As the Node for key 1 is clean flush doesn't run and it is immediately removed from the cache. So now we have dirtyKey set with key =1, but the value doesn't exist in the cache. Downstream processor tries to put key = 1 into the cache, it fails as key =1 is in the dirtyKeySet. Author: Damian Guy <[email protected]> Reviewers: Eno Thereska, Guozhang Wang Closes #2226 from dguy/cache-bug Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e9a67a8d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e9a67a8d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e9a67a8d Branch: refs/heads/trunk Commit: e9a67a8daaf4be35f292dd06ae3c7797200658fa Parents: 56e5627 Author: Damian Guy <[email protected]> Authored: Wed Dec 7 12:08:34 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Dec 7 12:08:34 2016 -0800 ---------------------------------------------------------------------- .../streams/state/internals/NamedCache.java | 55 ++++++++++++--- .../streams/state/internals/ThreadCache.java | 12 +++- .../internals/KTableKTableLeftJoinTest.java | 72 ++++++++++++++++++++ .../streams/state/internals/NamedCacheTest.java | 62 +++++++++++++++++ .../state/internals/ThreadCacheTest.java | 34 +++++++++ 5 files changed, 222 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a67a8d/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index ab771df..07968a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -92,10 +92,14 @@ class NamedCache { } synchronized void flush() { + flush(null); + } + + private void flush(final LRUNode evicted) { numFlushes++; log.debug("Named cache {} stats on flush: #hits={}, #misses={}, #overwrites={}, #flushes={}", - name, hits(), misses(), overwrites(), flushes()); + name, hits(), misses(), overwrites(), flushes()); if (listener == null) { throw new IllegalArgumentException("No listener for namespace " + name + " registered with cache"); @@ -106,6 +110,14 @@ class NamedCache { } final List<ThreadCache.DirtyEntry> entries = new ArrayList<>(); + + // evicted already been removed from the cache so add it to the list of + // flushed entries and remove from dirtyKeys. + if (evicted != null) { + entries.add(new ThreadCache.DirtyEntry(evicted.key, evicted.entry.value, evicted.entry)); + dirtyKeys.remove(evicted.key); + } + for (Bytes key : dirtyKeys) { final LRUNode node = getInternal(key); if (node == null) { @@ -114,11 +126,14 @@ class NamedCache { entries.add(new ThreadCache.DirtyEntry(key, node.entry.value, node.entry)); node.entry.markClean(); } - listener.apply(entries); + // clear dirtyKeys before the listener is applied as it may be re-entrant. dirtyKeys.clear(); + listener.apply(entries); } + + synchronized void put(final Bytes key, final LRUCacheEntry value) { if (!value.isDirty && dirtyKeys.contains(key)) { throw new IllegalStateException(String.format("Attempting to put a clean entry for key [%s] " + @@ -201,11 +216,11 @@ class NamedCache { } final LRUNode eldest = tail; currentSizeBytes -= eldest.size(); - if (eldest.entry.isDirty()) { - flush(); - } remove(eldest); cache.remove(eldest.key); + if (eldest.entry.isDirty()) { + flush(eldest); + } } synchronized LRUCacheEntry putIfAbsent(final Bytes key, final LRUCacheEntry value) { @@ -269,6 +284,14 @@ class NamedCache { return tail.entry; } + synchronized LRUNode head() { + return head; + } + + synchronized LRUNode tail() { + return tail; + } + synchronized long dirtySize() { return dirtyKeys.size(); } @@ -276,7 +299,7 @@ class NamedCache { /** * A simple wrapper class to implement a doubly-linked list around MemoryLRUCacheBytesEntry */ - private class LRUNode { + class LRUNode { private final Bytes key; private LRUCacheEntry entry; private LRUNode previous; @@ -287,21 +310,33 @@ class NamedCache { this.entry = entry; } - public LRUCacheEntry entry() { + LRUCacheEntry entry() { return entry; } - public void update(LRUCacheEntry entry) { - this.entry = entry; + Bytes key() { + return key; } - public long size() { + long size() { return key.get().length + 8 + // entry 8 + // previous 8 + // next entry.size(); } + + LRUNode next() { + return next; + } + + LRUNode previous() { + return previous; + } + + private void update(LRUCacheEntry entry) { + this.entry = entry; + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a67a8d/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 57ebfc7..3d9d0b8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -100,7 +100,7 @@ public class ThreadCache { cache.flush(); log.debug("Thread {} cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}", - name, puts(), gets(), evicts(), flushes()); + name, puts(), gets(), evicts(), flushes()); } public LRUCacheEntry get(final String namespace, byte[] key) { @@ -115,7 +115,6 @@ public class ThreadCache { public void put(final String namespace, byte[] key, LRUCacheEntry value) { numPuts++; - final NamedCache cache = getOrCreateCache(namespace); cache.put(Bytes.wrap(key), value); maybeEvict(namespace); @@ -195,9 +194,15 @@ public class ThreadCache { private void maybeEvict(final String namespace) { while (sizeBytes() > maxCacheSizeBytes) { final NamedCache cache = getOrCreateCache(namespace); + // we abort here as the put on this cache may have triggered + // a put on another cache. So even though the sizeInBytes() is + // still > maxCacheSizeBytes there is nothing to evict from this + // namespaced cache. + if (cache.size() == 0) { + return; + } log.trace("Thread {} evicting cache {}", name, namespace); cache.evict(); - numEvicts++; } } @@ -324,4 +329,5 @@ public class ThreadCache { } } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a67a8d/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index 816979a..9549869 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -22,8 +22,11 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -35,6 +38,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.Locale; +import java.util.Random; import java.util.Set; import static org.junit.Assert.assertEquals; @@ -322,6 +327,73 @@ public class KTableKTableLeftJoinTest { proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); } + /** + * This test was written to reproduce https://issues.apache.org/jira/browse/KAFKA-4492 + * It is based on a fairly complicated join used by the developer that reported the bug. + * Before the fix this would trigger an IllegalStateException. + */ + @Test + public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() throws Exception { + final String agg = "agg"; + final String tableOne = "tableOne"; + final String tableTwo = "tableTwo"; + final String tableThree = "tableThree"; + final String tableFour = "tableFour"; + final String tableFive = "tableFive"; + final String tableSix = "tableSix"; + final String[] inputs = {agg, tableOne, tableTwo, tableThree, tableFour, tableFive, tableSix}; + + final KStreamBuilder builder = new KStreamBuilder(); + final KTable<Long, String> aggTable = builder.table(Serdes.Long(), Serdes.String(), agg, agg) + .groupBy(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { + @Override + public KeyValue<Long, String> apply(final Long key, final String value) { + return new KeyValue<>(key, value); + } + }, Serdes.Long(), Serdes.String()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, "agg-store"); + + final KTable<Long, String> one = builder.table(Serdes.Long(), Serdes.String(), tableOne, tableOne); + final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo); + final KTable<Long, String> three = builder.table(Serdes.Long(), Serdes.String(), tableThree, tableThree); + final KTable<Long, String> four = builder.table(Serdes.Long(), Serdes.String(), tableFour, tableFour); + final KTable<Long, String> five = builder.table(Serdes.Long(), Serdes.String(), tableFive, tableFive); + final KTable<Long, String> six = builder.table(Serdes.Long(), Serdes.String(), tableSix, tableSix); + + final ValueMapper<String, String> mapper = new ValueMapper<String, String>() { + @Override + public String apply(final String value) { + return value.toUpperCase(Locale.ROOT); + } + }; + final KTable<Long, String> seven = one.mapValues(mapper); + + + final KTable<Long, String> eight = six.leftJoin(seven, MockValueJoiner.STRING_JOINER); + + aggTable.leftJoin(one, MockValueJoiner.STRING_JOINER) + .leftJoin(two, MockValueJoiner.STRING_JOINER) + .leftJoin(three, MockValueJoiner.STRING_JOINER) + .leftJoin(four, MockValueJoiner.STRING_JOINER) + .leftJoin(five, MockValueJoiner.STRING_JOINER) + .leftJoin(eight, MockValueJoiner.STRING_JOINER) + .mapValues(mapper); + + final KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, 250); + + final String[] values = {"a", "AA", "BBB", "CCCC", "DD", "EEEEEEEE", "F", "GGGGGGGGGGGGGGG", "HHH", "IIIIIIIIII", + "J", "KK", "LLLL", "MMMMMMMMMMMMMMMMMMMMMM", "NNNNN", "O", "P", "QQQQQ", "R", "SSSS", + "T", "UU", "VVVVVVVVVVVVVVVVVVV"}; + + final Random random = new Random(); + for (int i = 0; i < 1000; i++) { + for (String input : inputs) { + final Long key = Long.valueOf(random.nextInt(1000)); + final String value = values[random.nextInt(values.length)]; + driver.process(input, key, value); + } + } + } + private KeyValue<Integer, String> kv(Integer key, String value) { return new KeyValue<>(key, value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a67a8d/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java index 5c0d511..0a782d5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java @@ -32,6 +32,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; public class NamedCacheTest { @@ -197,4 +198,65 @@ public class NamedCacheTest { cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0, 0, "")); cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, false, 0, 0, 0, "")); } + + @Test + public void shouldBeReentrantAndNotBreakLRU() throws Exception { + final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, true, 0, 0, 0, ""); + final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3}); + cache.put(Bytes.wrap(new byte[]{0}), dirty); + cache.put(Bytes.wrap(new byte[]{1}), clean); + cache.put(Bytes.wrap(new byte[]{2}), clean); + assertEquals(3 * cache.head().size(), cache.sizeInBytes()); + cache.setListener(new ThreadCache.DirtyEntryFlushListener() { + @Override + public void apply(final List<ThreadCache.DirtyEntry> dirty) { + cache.put(Bytes.wrap(new byte[]{3}), clean); + // evict key 1 + cache.evict(); + // evict key 2 + cache.evict(); + } + }); + + assertEquals(3 * cache.head().size(), cache.sizeInBytes()); + // Evict key 0 + cache.evict(); + final Bytes entryFour = Bytes.wrap(new byte[]{4}); + cache.put(entryFour, dirty); + + // check that the LRU is still correct + final NamedCache.LRUNode head = cache.head(); + final NamedCache.LRUNode tail = cache.tail(); + assertEquals(2, cache.size()); + assertEquals(2 * head.size(), cache.sizeInBytes()); + // dirty should be the newest + assertEquals(entryFour, head.key()); + assertEquals(Bytes.wrap(new byte[] {3}), tail.key()); + assertSame(tail, head.next()); + assertNull(head.previous()); + assertSame(head, tail.previous()); + assertNull(tail.next()); + + // evict key 3 + cache.evict(); + assertSame(cache.head(), cache.tail()); + assertEquals(entryFour, cache.head().key()); + assertNull(cache.head().next()); + assertNull(cache.head().previous()); + } + + @Test + public void shouldNotThrowIllegalArgumentAfterEvictingDirtyRecordAndThenPuttingNewRecordWithSameKey() throws Exception { + final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, true, 0, 0, 0, ""); + final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3}); + final Bytes key = Bytes.wrap(new byte[] {3}); + cache.setListener(new ThreadCache.DirtyEntryFlushListener() { + @Override + public void apply(final List<ThreadCache.DirtyEntry> dirty) { + cache.put(key, clean); + } + }); + cache.put(key, dirty); + cache.evict(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a67a8d/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java index b07da6e..1049b91 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java @@ -459,6 +459,40 @@ public class ThreadCacheTest { assertEquals(cache.evicts(), 3); } + @Test + public void shouldNotLoopForEverWhenEvictingAndCurrentCacheIsEmpty() throws Exception { + final int maxCacheSizeInBytes = 100; + final ThreadCache threadCache = new ThreadCache(maxCacheSizeInBytes); + // trigger a put into another cache on eviction from "name" + threadCache.addDirtyEntryFlushListener("name", new ThreadCache.DirtyEntryFlushListener() { + @Override + public void apply(final List<ThreadCache.DirtyEntry> dirty) { + // put an item into an empty cache when the total cache size + // is already > than maxCacheSizeBytes + threadCache.put("other", new byte[]{0}, dirtyEntry(new byte[2])); + } + }); + threadCache.addDirtyEntryFlushListener("other", new ThreadCache.DirtyEntryFlushListener() { + @Override + public void apply(final List<ThreadCache.DirtyEntry> dirty) { + // + } + }); + threadCache.addDirtyEntryFlushListener("another", new ThreadCache.DirtyEntryFlushListener() { + @Override + public void apply(final List<ThreadCache.DirtyEntry> dirty) { + + } + }); + + threadCache.put("another", new byte[]{1}, dirtyEntry(new byte[1])); + threadCache.put("name", new byte[]{1}, dirtyEntry(new byte[1])); + // Put a large item such that when the eldest item is removed + // cache sizeInBytes() > maxCacheSizeBytes + int remaining = (int) (maxCacheSizeInBytes - threadCache.sizeBytes()); + threadCache.put("name", new byte[]{2}, dirtyEntry(new byte[remaining + 100])); + } + private LRUCacheEntry dirtyEntry(final byte[] key) { return new LRUCacheEntry(key, true, -1, -1, -1, ""); }
