Repository: kafka Updated Branches: refs/heads/trunk 82e84fabf -> e40e27b4e
KAFKA-5206: Use default aggSerde if no user-overridden is provided in RocksDBSessionStore RocksDBSessionStore wasn't properly using the default aggSerde if no Serde was supplied. Author: Kyle Winkelman <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #2971 from KyleWinkelman/RocksDBSessionStore-fix-aggSerde-use Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e40e27b4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e40e27b4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e40e27b4 Branch: refs/heads/trunk Commit: e40e27b4eb99e8931488e86289453c5f14d5e886 Parents: 82e84fa Author: Kyle Winkelman <[email protected]> Authored: Mon May 15 11:18:12 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon May 15 11:18:12 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/streams/state/internals/RocksDBSessionStore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e40e27b4/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index 5027781..103bb55 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -106,6 +106,6 @@ class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @Override public void put(final Windowed<K> sessionKey, final AGG aggregate) { - bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer(), topic), aggSerde.serializer().serialize(topic, aggregate)); + bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer(), topic), serdes.rawValue(aggregate)); } }
