Repository: kafka Updated Branches: refs/heads/trunk e71dce89c -> e472ee7b6
KAFKA-5172: Fix fetchPrevious to find the correct session Change fetchPrevious to use findSessions with the proper key and timestamps rather than using fetch. Author: Kyle Winkelman <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #2972 from KyleWinkelman/CachingSessionStore-fetchPrevious Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e472ee7b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e472ee7b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e472ee7b Branch: refs/heads/trunk Commit: e472ee7b613dbcab2ba1f5b6b384fa713f3906d0 Parents: e71dce8 Author: Kyle Winkelman <[email protected]> Authored: Sun May 7 22:15:40 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Sun May 7 22:15:40 2017 -0700 ---------------------------------------------------------------------- .../state/internals/CachingSessionStore.java | 12 ++++++---- .../internals/CachingSessionStoreTest.java | 25 ++++++++++++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e472ee7b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index bebd118..00d4a4a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; @@ -56,7 +57,6 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i this.keySchema = new SessionKeySchema(); } - @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name()); bytesStore.init(context, root); @@ -128,21 +128,23 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i context.setRecordContext(entry.recordContext()); try { final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(), topic); + final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key())); if (flushListener != null) { final AGG newValue = serdes.valueFrom(entry.newValue()); - final AGG oldValue = fetchPrevious(binaryKey); + final AGG oldValue = fetchPrevious(rawKey, key.window()); if (!(newValue == null && oldValue == null)) { flushListener.apply(key, newValue, oldValue); } } - bytesStore.put(new Windowed<>(Bytes.wrap(serdes.rawKey(key.key())), key.window()), entry.newValue()); + bytesStore.put(new Windowed<>(rawKey, key.window()), entry.newValue()); } finally { context.setRecordContext(current); } } - private AGG fetchPrevious(final Bytes key) { - try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore.fetch(key)) { + private AGG fetchPrevious(final Bytes rawKey, final Window window) { + try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore + .findSessions(rawKey, window.start(), window.end())) { if (!iterator.hasNext()) { return null; } http://git-wip-us.apache.org/repos/asf/kafka/blob/e472ee7b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index d316ae2..f8eec1c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.CacheFlushListener; +import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; @@ -162,6 +164,29 @@ public class CachingSessionStoreTest { } @Test + public void shouldForwardChangedValuesDuringFlush() throws Exception { + final Windowed<String> a = new Windowed<>("a", new SessionWindow(0, 0)); + final List<KeyValue<Windowed<String>, Change<Long>>> flushed = new ArrayList<>(); + cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, Long>() { + @Override + public void apply(final Windowed<String> key, final Long newValue, final Long oldValue) { + flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))); + } + }); + + cachingStore.put(a, 1L); + cachingStore.flush(); + + cachingStore.put(a, 2L); + cachingStore.flush(); + + cachingStore.remove(a); + cachingStore.flush(); + + assertEquals(flushed, Arrays.asList(KeyValue.pair(a, new Change<>(1L, null)), KeyValue.pair(a, new Change<>(2L, 1L)), KeyValue.pair(a, new Change<>(null, 2L)))); + } + + @Test public void shouldClearNamespaceCacheOnClose() throws Exception { final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0, 0)); cachingStore.put(a1, 1L);
