Repository: kafka Updated Branches: refs/heads/trunk 088ab3eaa -> 9d37b9f4b
KAKFA-3599: Move WindowStoreUtils to package "internals" Author: Matthias J. Sax <[email protected]> Reviewers: Ismael Juma, Michael G. Noll, Guozhang Wang Closes #1266 from mjsax/kafka-3599-minorCodeCleanup Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9d37b9f4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9d37b9f4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9d37b9f4 Branch: refs/heads/trunk Commit: 9d37b9f4b6ba228ff7f7b99c8a0921a971fc03a6 Parents: 088ab3e Author: Matthias J. Sax <[email protected]> Authored: Tue Apr 26 10:53:49 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Apr 26 10:53:49 2016 -0700 ---------------------------------------------------------------------- .../kafka/streams/state/WindowStoreUtils.java | 63 ------------------- .../streams/state/internals/RocksDBStore.java | 1 - .../state/internals/RocksDBWindowStore.java | 1 - .../state/internals/WindowStoreUtils.java | 65 ++++++++++++++++++++ .../state/internals/RocksDBWindowStoreTest.java | 1 - 5 files changed, 65 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9d37b9f4/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java deleted file mode 100644 index 2f99ad6..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.kafka.streams.state; - -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; - -import java.nio.ByteBuffer; - -public class WindowStoreUtils { - - private static final int SEQNUM_SIZE = 4; - private static final int TIMESTAMP_SIZE = 8; - - /** Inner byte array serde used for segments */ - public static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes(); - public static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray(); - public static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner", INNER_KEY_SERDE, INNER_VALUE_SERDE); - - @SuppressWarnings("unchecked") - public static final KeyValueIterator<Bytes, byte[]>[] NO_ITERATORS = (KeyValueIterator<Bytes, byte[]>[]) new KeyValueIterator[0]; - - public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) { - byte[] serializedKey = serdes.rawKey(key); - - ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE); - buf.put(serializedKey); - buf.putLong(timestamp); - buf.putInt(seqnum); - - return buf.array(); - } - - public static <K> K keyFromBinaryKey(byte[] binaryKey, StateSerdes<K, ?> serdes) { - byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE]; - - System.arraycopy(binaryKey, 0, bytes, 0, bytes.length); - - return serdes.keyFrom(bytes); - } - - public static long timestampFromBinaryKey(byte[] binaryKey) { - return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/9d37b9f4/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 3fef0ef..37609a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; -import org.apache.kafka.streams.state.WindowStoreUtils; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.CompactionStyle; import org.rocksdb.CompressionType; http://git-wip-us.apache.org/repos/asf/kafka/blob/9d37b9f4/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 4c964c6..803a089 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -30,7 +30,6 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -import org.apache.kafka.streams.state.WindowStoreUtils; import java.io.File; import java.text.SimpleDateFormat; http://git-wip-us.apache.org/repos/asf/kafka/blob/9d37b9f4/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java new file mode 100644 index 0000000..30693e7 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.StateSerdes; + +import java.nio.ByteBuffer; + +public class WindowStoreUtils { + + private static final int SEQNUM_SIZE = 4; + private static final int TIMESTAMP_SIZE = 8; + + /** Inner byte array serde used for segments */ + public static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes(); + public static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray(); + public static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner", INNER_KEY_SERDE, INNER_VALUE_SERDE); + + @SuppressWarnings("unchecked") + public static final KeyValueIterator<Bytes, byte[]>[] NO_ITERATORS = (KeyValueIterator<Bytes, byte[]>[]) new KeyValueIterator[0]; + + public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) { + byte[] serializedKey = serdes.rawKey(key); + + ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE); + buf.put(serializedKey); + buf.putLong(timestamp); + buf.putInt(seqnum); + + return buf.array(); + } + + public static <K> K keyFromBinaryKey(byte[] binaryKey, StateSerdes<K, ?> serdes) { + byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE]; + + System.arraycopy(binaryKey, 0, bytes, 0, bytes.length); + + return serdes.keyFrom(bytes); + } + + public static long timestampFromBinaryKey(byte[] binaryKey) { + return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9d37b9f4/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 502870b..e9888ad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -31,7 +31,6 @@ import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -import org.apache.kafka.streams.state.WindowStoreUtils; import org.apache.kafka.test.MockProcessorContext; import org.junit.Test;
