Repository: kafka Updated Branches: refs/heads/0.10.2 1ca2b1aac -> c15b93fbe
KAFKA-5206: Use default aggSerde if no user-overridden is provided in RocksDBSessionStore RocksDBSessionStore wasn't properly using the default aggSerde if no Serde was supplied. Author: Kyle Winkelman <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #2971 from KyleWinkelman/RocksDBSessionStore-fix-aggSerde-use (cherry picked from commit e40e27b4eb99e8931488e86289453c5f14d5e886) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c15b93fb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c15b93fb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c15b93fb Branch: refs/heads/0.10.2 Commit: c15b93fbee7883852c0d9633f2279c29772ba2ba Parents: 1ca2b1a Author: Kyle Winkelman <[email protected]> Authored: Mon May 15 11:18:12 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon May 15 11:20:41 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/streams/state/internals/RocksDBSessionStore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c15b93fb/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index 22f4a9d..109a67e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -62,7 +62,7 @@ class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG> { @Override public void put(final Windowed<K> sessionKey, final AGG aggregate) { - bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer(), topic), aggSerde.serializer().serialize(bytesStore.name(), aggregate)); + bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer(), topic), serdes.rawValue(aggregate)); } @Override
