Repository: kafka
Updated Branches:
  refs/heads/trunk e71dce89c -> e472ee7b6


KAFKA-5172: Fix fetchPrevious to find the correct session

Change fetchPrevious to use findSessions with the proper key and timestamps 
rather than using fetch.

Author: Kyle Winkelman <[email protected]>

Reviewers: Damian Guy, Guozhang Wang

Closes #2972 from KyleWinkelman/CachingSessionStore-fetchPrevious


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e472ee7b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e472ee7b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e472ee7b

Branch: refs/heads/trunk
Commit: e472ee7b613dbcab2ba1f5b6b384fa713f3906d0
Parents: e71dce8
Author: Kyle Winkelman <[email protected]>
Authored: Sun May 7 22:15:40 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Sun May 7 22:15:40 2017 -0700

----------------------------------------------------------------------
 .../state/internals/CachingSessionStore.java    | 12 ++++++----
 .../internals/CachingSessionStoreTest.java      | 25 ++++++++++++++++++++
 2 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e472ee7b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index bebd118..00d4a4a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
@@ -56,7 +57,6 @@ class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore i
         this.keySchema = new SessionKeySchema();
     }
 
-    @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
         topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name());
         bytesStore.init(context, root);
@@ -128,21 +128,23 @@ class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore i
         context.setRecordContext(entry.recordContext());
         try {
             final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), 
keySerde.deserializer(), topic);
+            final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));
             if (flushListener != null) {
                 final AGG newValue = serdes.valueFrom(entry.newValue());
-                final AGG oldValue = fetchPrevious(binaryKey);
+                final AGG oldValue = fetchPrevious(rawKey, key.window());
                 if (!(newValue == null && oldValue == null)) {
                     flushListener.apply(key, newValue, oldValue);
                 }
             }
-            bytesStore.put(new 
Windowed<>(Bytes.wrap(serdes.rawKey(key.key())), key.window()), 
entry.newValue());
+            bytesStore.put(new Windowed<>(rawKey, key.window()), 
entry.newValue());
         } finally {
             context.setRecordContext(current);
         }
     }
 
-    private AGG fetchPrevious(final Bytes key) {
-        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
bytesStore.fetch(key)) {
+    private AGG fetchPrevious(final Bytes rawKey, final Window window) {
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
bytesStore
+                .findSessions(rawKey, window.start(), window.end())) {
             if (!iterator.hasNext()) {
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e472ee7b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index d316ae2..f8eec1c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
+import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
@@ -162,6 +164,29 @@ public class CachingSessionStoreTest {
     }
 
     @Test
+    public void shouldForwardChangedValuesDuringFlush() throws Exception {
+        final Windowed<String> a = new Windowed<>("a", new SessionWindow(0, 
0));
+        final List<KeyValue<Windowed<String>, Change<Long>>> flushed = new 
ArrayList<>();
+        cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, 
Long>() {
+                @Override
+                public void apply(final Windowed<String> key, final Long 
newValue, final Long oldValue) {
+                    flushed.add(KeyValue.pair(key, new Change<>(newValue, 
oldValue)));
+                }
+            });
+        
+        cachingStore.put(a, 1L);
+        cachingStore.flush();
+        
+        cachingStore.put(a, 2L);
+        cachingStore.flush();
+
+        cachingStore.remove(a);
+        cachingStore.flush();
+
+        assertEquals(flushed, Arrays.asList(KeyValue.pair(a, new Change<>(1L, 
null)), KeyValue.pair(a, new Change<>(2L, 1L)), KeyValue.pair(a, new 
Change<>(null, 2L))));
+    }
+
+    @Test
     public void shouldClearNamespaceCacheOnClose() throws Exception {
         final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0, 
0));
         cachingStore.put(a1, 1L);

Reply via email to