Repository: kafka Updated Branches: refs/heads/0.10.2 c15b93fbe -> 9668b6bf6
KAFKA-5206: Use default values of keySerde if it is not specified by users in CachingSessionStore CachingSessionStore wasn't properly using the default keySerde if no Serde was supplied. I saw the below error in the logs for one of my test cases. Author: Kyle Winkelman <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #2963 from KyleWinkelman/CachingSessionStore-fix-keySerde-use (cherry picked from commit 475cc2544e18b6b321e716691648024cdbbafb16) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9668b6bf Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9668b6bf Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9668b6bf Branch: refs/heads/0.10.2 Commit: 9668b6bf68a9fb5aedf04dfc4989ab46040838c3 Parents: c15b93f Author: Kyle Winkelman <[email protected]> Authored: Mon May 15 11:51:10 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon May 15 11:53:56 2017 -0700 ---------------------------------------------------------------------- .../kafka/streams/state/internals/CachingSessionStore.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9668b6bf/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index ed64246..80160b0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -60,7 +60,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState final long earliestSessionEndTime, final long latestSessionStartTime) { validateStoreOpen(); - final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(topic, key)); + final Bytes binarySessionId = Bytes.wrap(serdes.rawKey(key)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, keySchema.lowerRange(binarySessionId, earliestSessionEndTime).get(), @@ -81,7 +81,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState public void put(final Windowed<K> key, AGG value) { validateStoreOpen(); - final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer(), topic); + final Bytes binaryKey = SessionKeySerde.toBinary(key, serdes.keySerializer(), topic); final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(), key.window().end(), context.partition(), context.topic()); cache.put(cacheName, binaryKey.get(), entry); @@ -127,7 +127,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState final RecordContext current = context.recordContext(); context.setRecordContext(entry.recordContext()); try { - final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(), topic); + final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), serdes.keyDeserializer(), topic); if (flushListener != null) { final AGG newValue = serdes.valueFrom(entry.newValue()); final AGG oldValue = fetchPrevious(binaryKey);
