Repository: kafka
Updated Branches:
  refs/heads/trunk 3e6669000 -> 475cc2544


KAFKA-5206: Use default values of keySerde if it is not specified by users in 
CachingSessionStore

CachingSessionStore wasn't properly using the default keySerde if no Serde was 
supplied. I saw the below error in the logs for one of my test cases.

ERROR stream-thread 
[cogroup-integration-test-3-5570fe48-d2a3-4271-80b1-81962295553d-StreamThread-6]
 Streams application error during processing:  
(org.apache.kafka.streams.processor.internals.StreamThread:335)
java.lang.NullPointerException
        at 
org.apache.kafka.streams.state.internals.CachingSessionStore.findSessions(CachingSessionStore.java:93)
        at 
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:94)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
        at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
        at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:69)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:206)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:657)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:728)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:327)

Author: Kyle Winkelman <[email protected]>

Reviewers: Damian Guy, Guozhang Wang

Closes #2963 from KyleWinkelman/CachingSessionStore-fix-keySerde-use


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/475cc254
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/475cc254
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/475cc254

Branch: refs/heads/trunk
Commit: 475cc2544e18b6b321e716691648024cdbbafb16
Parents: 3e66690
Author: Kyle Winkelman <[email protected]>
Authored: Mon May 15 11:51:10 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Mon May 15 11:51:10 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/state/internals/CachingSessionStore.java     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/475cc254/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 00d4a4a..41e81eb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -90,7 +90,7 @@ class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore i
                                                            final long 
earliestSessionEndTime,
                                                            final long 
latestSessionStartTime) {
         validateStoreOpen();
-        final Bytes binarySessionId = 
Bytes.wrap(keySerde.serializer().serialize(topic, key));
+        final Bytes binarySessionId = Bytes.wrap(serdes.rawKey(key));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
cache.range(cacheName,
                                                                                
   keySchema.lowerRange(binarySessionId, earliestSessionEndTime),
                                                                                
   keySchema.upperRange(binarySessionId, latestSessionStartTime));
@@ -111,7 +111,7 @@ class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore i
     @Override
     public void put(final Windowed<K> key, AGG value) {
         validateStoreOpen();
-        final Bytes binaryKey = SessionKeySerde.toBinary(key, 
keySerde.serializer(), topic);
+        final Bytes binaryKey = SessionKeySerde.toBinary(key, 
serdes.keySerializer(), topic);
         final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), 
true, context.offset(),
                                                       key.window().end(), 
context.partition(), context.topic());
         cache.put(cacheName, binaryKey, entry);
@@ -127,7 +127,7 @@ class CachingSessionStore<K, AGG> extends 
WrappedStateStore.AbstractStateStore i
         final RecordContext current = context.recordContext();
         context.setRecordContext(entry.recordContext());
         try {
-            final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), 
keySerde.deserializer(), topic);
+            final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), 
serdes.keyDeserializer(), topic);
             final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));
             if (flushListener != null) {
                 final AGG newValue = serdes.valueFrom(entry.newValue());

Reply via email to