This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9a71468 KAFKA-10767: Adding test cases for all, reverseAll and reverseRange for ThreadCache (#9779) 9a71468 is described below commit 9a71468cb0e5fc9faeba39b56f36d0c93ca73c59 Author: vamossagar12 <sagarmeansoc...@gmail.com> AuthorDate: Wed May 5 15:56:51 2021 +0530 KAFKA-10767: Adding test cases for all, reverseAll and reverseRange for ThreadCache (#9779) The test cases for ThreaCache didn't have the corresponding unit tests for all, reverseAll and reverseRange methods. This PR aims to add the same. Reviewers: Bruno Cadonna <cado...@apache.org> --- .../streams/state/internals/ThreadCacheTest.java | 225 +++++++++++++++------ 1 file changed, 168 insertions(+), 57 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java index afd5449..c449de9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import static org.hamcrest.MatcherAssert.assertThat; import org.junit.Test; import java.util.ArrayList; @@ -29,7 +30,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; +import java.util.function.Supplier; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -42,19 +45,20 @@ public class ThreadCacheTest { final String namespace1 = "0.1-namespace"; final String namespace2 = "0.2-namespace"; private final LogContext logContext = new LogContext("testCache "); + private final byte[][] bytes = new byte[][]{{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; @Test public void basicPutGet() { final List<KeyValue<String, String>> toInsert = Arrays.asList( - new KeyValue<>("K1", "V1"), - new KeyValue<>("K2", "V2"), - new KeyValue<>("K3", "V3"), - new KeyValue<>("K4", "V4"), - new KeyValue<>("K5", "V5")); + new KeyValue<>("K1", "V1"), + new KeyValue<>("K2", "V2"), + new KeyValue<>("K3", "V3"), + new KeyValue<>("K4", "V4"), + new KeyValue<>("K5", "V5")); final KeyValue<String, String> kv = toInsert.get(0); final ThreadCache cache = new ThreadCache(logContext, - toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""), - new MockStreamsMetrics(new Metrics())); + toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""), + new MockStreamsMetrics(new Metrics())); for (final KeyValue<String, String> kvToInsert : toInsert) { final Bytes key = Bytes.wrap(kvToInsert.key.getBytes()); @@ -132,35 +136,35 @@ public class ThreadCacheTest { static int memoryCacheEntrySize(final byte[] key, final byte[] value, final String topic) { return key.length + - value.length + - 1 + // isDirty - 8 + // timestamp - 8 + // offset - 4 + - topic.length() + - // LRU Node entries - key.length + - 8 + // entry - 8 + // previous - 8; // next + value.length + + 1 + // isDirty + 8 + // timestamp + 8 + // offset + 4 + + topic.length() + + // LRU Node entries + key.length + + 8 + // entry + 8 + // previous + 8; // next } @Test public void evict() { final List<KeyValue<String, String>> received = new ArrayList<>(); final List<KeyValue<String, String>> expected = Collections.singletonList( - new KeyValue<>("K1", "V1")); + new KeyValue<>("K1", "V1")); final List<KeyValue<String, String>> toInsert = Arrays.asList( - new KeyValue<>("K1", "V1"), - new KeyValue<>("K2", "V2"), - new KeyValue<>("K3", "V3"), - new KeyValue<>("K4", "V4"), - new KeyValue<>("K5", "V5")); + new KeyValue<>("K1", "V1"), + new KeyValue<>("K2", "V2"), + new KeyValue<>("K3", "V3"), + new KeyValue<>("K4", "V4"), + new KeyValue<>("K5", "V5")); final KeyValue<String, String> kv = toInsert.get(0); final ThreadCache cache = new ThreadCache(logContext, - memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""), - new MockStreamsMetrics(new Metrics())); + memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""), + new MockStreamsMetrics(new Metrics())); cache.addDirtyEntryFlushListener(namespace, dirty -> { for (final ThreadCache.DirtyEntry dirtyEntry : dirty) { received.add(new KeyValue<>(dirtyEntry.key().toString(), new String(dirtyEntry.newValue()))); @@ -233,46 +237,88 @@ public class ThreadCacheTest { assertArrayEquals(name1Byte.get(), cache.get(namespace2, nameByte).value()); } + private ThreadCache setupThreadCache(final int first, final int last, final long entrySize, final boolean reverse) { + final ThreadCache cache = new ThreadCache(logContext, entrySize, new MockStreamsMetrics(new Metrics())); + cache.addDirtyEntryFlushListener(namespace, dirty -> { }); + int index = first; + while ((!reverse && index < last) || (reverse && index >= last)) { + cache.put(namespace, Bytes.wrap(bytes[index]), dirtyEntry(bytes[index])); + if (!reverse) + index++; + else + index--; + } + return cache; + } + @Test public void shouldPeekNextKey() { - final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = setupThreadCache(0, 1, 10000L, false); final Bytes theByte = Bytes.wrap(new byte[]{0}); - cache.put(namespace, theByte, dirtyEntry(theByte.get())); final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte, Bytes.wrap(new byte[]{1})); assertEquals(theByte, iterator.peekNextKey()); assertEquals(theByte, iterator.peekNextKey()); } @Test + public void shouldPeekNextKeyReverseRange() { + final ThreadCache cache = setupThreadCache(1, 1, 10000L, true); + final Bytes theByte = Bytes.wrap(new byte[]{1}); + final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), theByte); + assertThat(iterator.peekNextKey(), is(theByte)); + assertThat(iterator.peekNextKey(), is(theByte)); + } + + @Test public void shouldGetSameKeyAsPeekNext() { - final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = setupThreadCache(0, 1, 10000L, false); final Bytes theByte = Bytes.wrap(new byte[]{0}); - cache.put(namespace, theByte, dirtyEntry(theByte.get())); final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte, Bytes.wrap(new byte[]{1})); - assertEquals(iterator.peekNextKey(), iterator.next().key); + assertThat(iterator.peekNextKey(), is(iterator.next().key)); } @Test - public void shouldThrowIfNoPeekNextKey() { - final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); - final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})); + public void shouldGetSameKeyAsPeekNextReverseRange() { + final ThreadCache cache = setupThreadCache(1, 1, 10000L, true); + final Bytes theByte = Bytes.wrap(new byte[]{1}); + final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), theByte); + assertThat(iterator.peekNextKey(), is(iterator.next().key)); + } + + private void shouldThrowIfNoPeekNextKey(final Supplier<ThreadCache.MemoryLRUCacheBytesIterator> methodUnderTest) { + final ThreadCache.MemoryLRUCacheBytesIterator iterator = methodUnderTest.get(); assertThrows(NoSuchElementException.class, iterator::peekNextKey); } @Test + public void shouldThrowIfNoPeekNextKeyRange() { + final ThreadCache cache = setupThreadCache(0, 0, 10000L, false); + shouldThrowIfNoPeekNextKey(() -> cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}))); + } + + @Test + public void shouldThrowIfNoPeekNextKeyReverseRange() { + final ThreadCache cache = setupThreadCache(-1, 0, 10000L, true); + shouldThrowIfNoPeekNextKey(() -> cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}))); + } + + @Test public void shouldReturnFalseIfNoNextKey() { - final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = setupThreadCache(0, 0, 10000L, false); final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})); assertFalse(iterator.hasNext()); } @Test + public void shouldReturnFalseIfNoNextKeyReverseRange() { + final ThreadCache cache = setupThreadCache(-1, 0, 10000L, true); + final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})); + assertFalse(iterator.hasNext()); + } + + @Test public void shouldPeekAndIterateOverRange() { - final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); - final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; - for (final byte[] aByte : bytes) { - cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte)); - } + final ThreadCache cache = setupThreadCache(0, 10, 10000L, false); final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{4})); int bytesIndex = 1; while (iterator.hasNext()) { @@ -286,12 +332,8 @@ public class ThreadCacheTest { } @Test - public void shouldSkipToEntryWhentoInclusiveIsFalseInRange() { - final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); - final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; - for (final byte[] aByte : bytes) { - cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte)); - } + public void shouldSkipToEntryWhenToInclusiveIsFalseInRange() { + final ThreadCache cache = setupThreadCache(0, 10, 10000L, false); final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{4}), false); int bytesIndex = 1; while (iterator.hasNext()) { @@ -305,26 +347,95 @@ public class ThreadCacheTest { } @Test + public void shouldPeekAndIterateOverReverseRange() { + final ThreadCache cache = setupThreadCache(10, 0, 10000L, true); + final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{4})); + int bytesIndex = 4; + while (iterator.hasNext()) { + final Bytes peekedKey = iterator.peekNextKey(); + final KeyValue<Bytes, LRUCacheEntry> next = iterator.next(); + assertArrayEquals(bytes[bytesIndex], peekedKey.get()); + assertArrayEquals(bytes[bytesIndex], next.key.get()); + bytesIndex--; + } + assertEquals(0, bytesIndex); + } + + @Test public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() { final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], ""); - final ThreadCache cache = new ThreadCache(logContext, entrySize * 5, new MockStreamsMetrics(new Metrics())); - cache.addDirtyEntryFlushListener(namespace, dirty -> { }); - - final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}}; - for (int i = 0; i < 5; i++) { - cache.put(namespace, Bytes.wrap(bytes[i]), dirtyEntry(bytes[i])); - } + final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5, false); assertEquals(5, cache.size()); - // should evict byte[] {0} cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6})); - final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5})); + assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey()); + } + + @Test + public void shouldSkipEntriesWhereValueHasBeenEvictedFromCacheReverseRange() { + final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], ""); + final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5, true); + assertEquals(5, cache.size()); + // should evict byte[] {4} + cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6})); + final ThreadCache.MemoryLRUCacheBytesIterator range = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5})); + assertEquals(Bytes.wrap(new byte[]{3}), range.peekNextKey()); + } + + @Test + public void shouldFetchAllEntriesInCache() { + final ThreadCache cache = setupThreadCache(0, 11, 10000L, false); + final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.all(namespace); + int bytesIndex = 0; + while (iterator.hasNext()) { + final Bytes peekedKey = iterator.peekNextKey(); + final KeyValue<Bytes, LRUCacheEntry> next = iterator.next(); + assertArrayEquals(bytes[bytesIndex], peekedKey.get()); + assertArrayEquals(bytes[bytesIndex], next.key.get()); + bytesIndex++; + } + assertEquals(11, bytesIndex); + } + @Test + public void shouldFetchAllEntriesInCacheInReverseOrder() { + final ThreadCache cache = setupThreadCache(10, 0, 10000L, true); + final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseAll(namespace); + int bytesIndex = 10; + while (iterator.hasNext()) { + final Bytes peekedKey = iterator.peekNextKey(); + final KeyValue<Bytes, LRUCacheEntry> next = iterator.next(); + assertArrayEquals(bytes[bytesIndex], peekedKey.get()); + assertArrayEquals(bytes[bytesIndex], next.key.get()); + bytesIndex--; + } + assertEquals(-1, bytesIndex); + } + + @Test + public void shouldReturnAllUnevictedValuesFromCache() { + final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], ""); + final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5, false); + assertEquals(5, cache.size()); + // should evict byte[] {0} + cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6})); + final ThreadCache.MemoryLRUCacheBytesIterator range = cache.all(namespace); assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey()); } @Test + public void shouldReturnAllUnevictedValuesFromCacheInReverseOrder() { + final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], ""); + final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5, true); + assertEquals(5, cache.size()); + // should evict byte[] {4} + cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6})); + final ThreadCache.MemoryLRUCacheBytesIterator range = cache.reverseAll(namespace); + assertEquals(Bytes.wrap(new byte[]{6}), range.peekNextKey()); + } + + @Test public void shouldFlushDirtyEntriesForNamespace() { final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics())); final List<byte[]> received = new ArrayList<>(); @@ -394,7 +505,7 @@ public class ThreadCacheTest { cache.addDirtyEntryFlushListener(namespace, received::addAll); cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})), - KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6})))); + KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6})))); assertEquals(cache.evicts(), 2); assertEquals(received.size(), 2); @@ -405,7 +516,7 @@ public class ThreadCacheTest { final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics())); cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})), - KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6})))); + KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6})))); assertArrayEquals(new byte[]{5}, cache.get(namespace, Bytes.wrap(new byte[]{0})).value()); assertArrayEquals(new byte[]{6}, cache.get(namespace, Bytes.wrap(new byte[]{1})).value());