Repository: kafka Updated Branches: refs/heads/trunk 84ca88729 -> 17668e81c
MINOR: Expose window store sequence number guozhangwang mjsax enothereska Currently, Kafka Streams does not have a util to get access to the sequence number added to the key of windows state store changelogs. I'm interested in exposing it so the the contents of a changelog topic can be 1) inspected for debugging purposes and 2) saved to text file and loaded from text file Author: Roger Hoover <[email protected]> Reviewers: Eno Thereska <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1501 from theduderog/expose-seq-num Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/17668e81 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/17668e81 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/17668e81 Branch: refs/heads/trunk Commit: 17668e81c95d89f6543657351abc0a18004ecde5 Parents: 84ca887 Author: Roger Hoover <[email protected]> Authored: Wed Jun 15 12:09:03 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Wed Jun 15 12:09:03 2016 -0700 ---------------------------------------------------------------------- .../state/internals/WindowStoreUtils.java | 4 ++ .../state/internals/WindowStoreUtilsTest.java | 44 ++++++++++++++++++++ 2 files changed, 48 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/17668e81/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java index 30693e7..309c9c0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java @@ -62,4 +62,8 @@ public class WindowStoreUtils { public static long timestampFromBinaryKey(byte[] binaryKey) { return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE); } + + public static int sequenceNumberFromBinaryKey(byte[] binaryKey) { + return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - SEQNUM_SIZE); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/17668e81/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java new file mode 100644 index 0000000..e0cb3ae --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.state.StateSerdes; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class WindowStoreUtilsTest { + protected StateSerdes<String, String> serdes = new StateSerdes<>("dummy", new Serdes.StringSerde(), new Serdes.StringSerde()); + + @Test + public void testSerialization() throws Exception { + final String key = "key1"; + final long timestamp = 99L; + final int seqNum = 3; + byte[] bytes = WindowStoreUtils.toBinaryKey(key, timestamp, seqNum, serdes); + final String parsedKey = WindowStoreUtils.keyFromBinaryKey(bytes, serdes); + final long parsedTs = WindowStoreUtils.timestampFromBinaryKey(bytes); + final int parsedSeqNum = WindowStoreUtils.sequenceNumberFromBinaryKey(bytes); + assertEquals(key, parsedKey); + assertEquals(timestamp, parsedTs); + assertEquals(seqNum, parsedSeqNum); + } + +} \ No newline at end of file
