Repository: kafka Updated Branches: refs/heads/trunk 475cc2544 -> c26545ea5 (forced update)
KAFKA-5205: Use default values of keySerde if it is not specified by users in CachingSessionStore CachingSessionStore wasn't properly using the default keySerde if no Serde was supplied. I saw the below error in the logs for one of my test cases. Author: Kyle Winkelman <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #2963 from KyleWinkelman/CachingSessionStore-fix-keySerde-use Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c26545ea Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c26545ea Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c26545ea Branch: refs/heads/trunk Commit: c26545ea5701e02fc2667c81c719630ba14f500b Parents: 3e66690 Author: Kyle Winkelman <[email protected]> Authored: Mon May 15 11:51:10 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon May 15 11:57:52 2017 -0700 ---------------------------------------------------------------------- .../kafka/streams/state/internals/CachingSessionStore.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c26545ea/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 00d4a4a..41e81eb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -90,7 +90,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i final long earliestSessionEndTime, final long latestSessionStartTime) { validateStoreOpen(); - final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(topic, key)); + final Bytes binarySessionId = Bytes.wrap(serdes.rawKey(key)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, keySchema.lowerRange(binarySessionId, earliestSessionEndTime), keySchema.upperRange(binarySessionId, latestSessionStartTime)); @@ -111,7 +111,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @Override public void put(final Windowed<K> key, AGG value) { validateStoreOpen(); - final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer(), topic); + final Bytes binaryKey = SessionKeySerde.toBinary(key, serdes.keySerializer(), topic); final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(), key.window().end(), context.partition(), context.topic()); cache.put(cacheName, binaryKey, entry); @@ -127,7 +127,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i final RecordContext current = context.recordContext(); context.setRecordContext(entry.recordContext()); try { - final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(), topic); + final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), serdes.keyDeserializer(), topic); final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key())); if (flushListener != null) { final AGG newValue = serdes.valueFrom(entry.newValue());
