Repository: kafka Updated Branches: refs/heads/trunk e7d04c251 -> a02c8aaec
KAFKA-3499: prevent array typed keys in KeyValueStore Author: Guozhang Wang <[email protected]> Reviewers: Ismael Juma, Josh Gruenberg, Michael G. Noll, Ewen Cheslack-Postava Closes #1229 from guozhangwang/K3499 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a02c8aae Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a02c8aae Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a02c8aae Branch: refs/heads/trunk Commit: a02c8aaecfbd13838c2a062bac1455da352028fe Parents: e7d04c2 Author: Guozhang Wang <[email protected]> Authored: Tue Apr 26 07:32:21 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Apr 26 07:32:21 2016 -0700 ---------------------------------------------------------------------- .../common/serialization/BytesDeserializer.java | 35 ++++ .../common/serialization/BytesSerializer.java | 36 ++++ .../kafka/common/serialization/Serdes.java | 39 +++- .../org/apache/kafka/common/utils/Bytes.java | 178 +++++++++++++++++++ .../kafka/streams/state/WindowStoreUtils.java | 10 +- .../InMemoryKeyValueStoreSupplier.java | 13 +- .../streams/state/internals/MemoryLRUCache.java | 13 ++ .../state/internals/MeteredKeyValueStore.java | 8 +- .../state/internals/RawStoreChangeLogger.java | 56 ------ .../streams/state/internals/RocksDBStore.java | 73 ++++---- .../state/internals/RocksDBWindowStore.java | 48 ++--- .../state/internals/StoreChangeLogger.java | 10 ++ .../streams/state/KeyValueStoreTestDriver.java | 15 +- .../internals/AbstractKeyValueStoreTest.java | 2 - .../state/internals/StoreChangeLoggerTest.java | 41 ----- 15 files changed, 386 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java new file mode 100644 index 0000000..ee6a57c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.common.utils.Bytes; + +import java.util.Map; + +public class BytesDeserializer implements Deserializer<Bytes> { + + public void configure(Map<String, ?> configs, boolean isKey) { + // nothing to do + } + + public Bytes deserialize(String topic, byte[] data) { + if (data == null) + return null; + + return new Bytes(data); + } + + public void close() { + // nothing to do + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java new file mode 100644 index 0000000..3d04446 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.common.utils.Bytes; + +import java.util.Map; + +public class BytesSerializer implements Serializer<Bytes> { + + public void configure(Map<String, ?> configs, boolean isKey) { + // nothing to do + } + + public byte[] serialize(String topic, Bytes data) { + if (data == null) + return null; + + return data.get(); + } + + public void close() { + // nothing to do + } +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java index f27f74f..d744522 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java @@ -13,6 +13,8 @@ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.utils.Bytes; + import java.nio.ByteBuffer; /** @@ -80,6 +82,18 @@ public class Serdes { } } + static public final class BytesSerde implements Serde<Bytes> { + @Override + public Serializer<Bytes> serializer() { + return new BytesSerializer(); + } + + @Override + public Deserializer<Bytes> deserializer() { + return new BytesDeserializer(); + } + } + static public final class ByteArraySerde implements Serde<byte[]> { @Override public Serializer<byte[]> serializer() { @@ -114,10 +128,14 @@ public class Serdes { return (Serde<T>) ByteArray(); } - if (ByteBufferSerde.class.isAssignableFrom(type)) { + if (ByteBuffer.class.isAssignableFrom(type)) { return (Serde<T>) ByteBuffer(); } + if (Bytes.class.isAssignableFrom(type)) { + return (Serde<T>) Bytes(); + } + // TODO: we can also serializes objects of type T using generic Java serialization by default throw new IllegalArgumentException("Unknown class for built-in serializer"); } @@ -150,42 +168,49 @@ public class Serdes { } /* - * A serde for nullable long type. + * A serde for nullable {@code Long} type. */ static public Serde<Long> Long() { return new LongSerde(); } /* - * A serde for nullable int type. + * A serde for nullable {@code Integer} type. */ static public Serde<Integer> Integer() { return new IntegerSerde(); } /* - * A serde for nullable long type. + * A serde for nullable {@code Double} type. */ static public Serde<Double> Double() { return new DoubleSerde(); } /* - * A serde for nullable string type. + * A serde for nullable {@code String} type. */ static public Serde<String> String() { return new StringSerde(); } /* - * A serde for nullable byte array type. + * A serde for nullable {@code ByteBuffer} type. */ static public Serde<ByteBuffer> ByteBuffer() { return new ByteBufferSerde(); } /* - * A serde for nullable byte array type. + * A serde for nullable {@code Bytes} type. + */ + static public Serde<Bytes> Bytes() { + return new BytesSerde(); + } + + /* + * A serde for nullable {@code byte[]} type. */ static public Serde<byte[]> ByteArray() { return new ByteArraySerde(); http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java new file mode 100644 index 0000000..78340e5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import java.util.Arrays; +import java.util.Comparator; + +/** + * Utility class that handles immutable byte arrays. + */ +public class Bytes implements Comparable<Bytes> { + + private static final char[] HEX_CHARS_UPPER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; + + private final byte[] bytes; + + // cache the hash code for the string, default to 0 + private int hashCode; + + public static Bytes wrap(byte[] bytes) { + return new Bytes(bytes); + } + + /** + * Create a Bytes using the byte array. + * + * @param bytes This array becomes the backing storage for the object. + */ + public Bytes(byte[] bytes) { + this.bytes = bytes; + + // initialize hash code to 0 + hashCode = 0; + } + + /** + * Get the data from the Bytes. + * @return The data is only valid between offset and offset+length. + */ + public byte[] get() { + return this.bytes; + } + + /** + * The hashcode is cached except for the case where it is computed as 0, in which + * case we compute the hashcode on every call. + * + * @return the hashcode + */ + @Override + public int hashCode() { + if (hashCode == 0) { + hashCode = Arrays.hashCode(bytes); + } + + return hashCode; + } + + @Override + public boolean equals(Object other) { + if (this == other) + return true; + + // we intentionally use the function to compute hashcode here + if (this.hashCode() != other.hashCode()) + return false; + + if (other instanceof Bytes) + return Arrays.equals(this.bytes, ((Bytes) other).get()); + + return false; + } + + @Override + public int compareTo(Bytes that) { + return BYTES_LEXICO_COMPARATOR.compare(this.bytes, that.bytes); + } + + @Override + public String toString() { + return Bytes.toString(bytes, 0, bytes.length); + } + + /** + * Write a printable representation of a byte array. Non-printable + * characters are hex escaped in the format \\x%02X, eg: + * \x00 \x05 etc. + * + * This function is brought from org.apache.hadoop.hbase.util.Bytes + * + * @param b array to write out + * @param off offset to start at + * @param len length to write + * @return string output + */ + private static String toString(final byte[] b, int off, int len) { + StringBuilder result = new StringBuilder(); + + if (b == null) + return result.toString(); + + // just in case we are passed a 'len' that is > buffer length... + if (off >= b.length) + return result.toString(); + + if (off + len > b.length) + len = b.length - off; + + for (int i = off; i < off + len; ++i) { + int ch = b[i] & 0xFF; + if (ch >= ' ' && ch <= '~' && ch != '\\') { + result.append((char) ch); + } else { + result.append("\\x"); + result.append(HEX_CHARS_UPPER[ch / 0x10]); + result.append(HEX_CHARS_UPPER[ch % 0x10]); + } + } + return result.toString(); + } + + /** + * A byte array comparator based on lexicograpic ordering. + */ + public final static Comparator<byte[]> BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator(); + + private interface ByteArrayComparator extends Comparator<byte[]> { + + int compare(final byte[] buffer1, int offset1, int length1, + final byte[] buffer2, int offset2, int length2); + } + + private static class LexicographicByteArrayComparator implements ByteArrayComparator { + + @Override + public int compare(byte[] buffer1, byte[] buffer2) { + return compare(buffer1, 0, buffer1.length, buffer2, 0, buffer2.length); + } + + public int compare(final byte[] buffer1, int offset1, int length1, + final byte[] buffer2, int offset2, int length2) { + + // short circuit equal case + if (buffer1 == buffer2 && + offset1 == offset2 && + length1 == length2) { + return 0; + } + + // similar to Arrays.compare() but considers offset and length + int end1 = offset1 + length1; + int end2 = offset2 + length2; + for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) { + int a = buffer1[i] & 0xff; + int b = buffer2[j] & 0xff; + if (a != b) { + return a - b; + } + } + return length1 - length2; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java index fdf3269..2f99ad6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java @@ -21,6 +21,7 @@ package org.apache.kafka.streams.state; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import java.nio.ByteBuffer; @@ -30,13 +31,12 @@ public class WindowStoreUtils { private static final int TIMESTAMP_SIZE = 8; /** Inner byte array serde used for segments */ - public static final Serde<byte[]> INNER_SERDE = Serdes.ByteArray(); - - /** Inner byte array state serde used for segments */ - public static final StateSerdes<byte[], byte[]> INNER_SERDES = new StateSerdes<>("", INNER_SERDE, INNER_SERDE); + public static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes(); + public static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray(); + public static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner", INNER_KEY_SERDE, INNER_VALUE_SERDE); @SuppressWarnings("unchecked") - public static final KeyValueIterator<byte[], byte[]>[] NO_ITERATORS = (KeyValueIterator<byte[], byte[]>[]) new KeyValueIterator[0]; + public static final KeyValueIterator<Bytes, byte[]>[] NO_ITERATORS = (KeyValueIterator<Bytes, byte[]>[]) new KeyValueIterator[0]; public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) { byte[] serializedKey = serdes.rawKey(key); http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java index 3a5819c..a25153c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java @@ -35,6 +35,10 @@ import java.util.TreeMap; /** * An in-memory key-value store based on a TreeMap. * + * Note that the use of array-typed keys is discouraged because they result in incorrect ordering behavior. + * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class, + * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}. + * * @param <K> The key type * @param <V> The value type * @@ -63,7 +67,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { } public StateStore get() { - return new MeteredKeyValueStore<>(new MemoryStore<K, V>(name, keySerde, valueSerde).enableLogging(), "in-memory-state", time); + return new MeteredKeyValueStore<>(new MemoryStore<>(name, keySerde, valueSerde).enableLogging(), "in-memory-state", time); } private static class MemoryStore<K, V> implements KeyValueStore<K, V> { @@ -76,6 +80,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { this.name = name; this.keySerde = keySerde; this.valueSerde = valueSerde; + + // TODO: when we have serde associated with class types, we can + // improve this situation by passing the comparator here. this.map = new TreeMap<>(); } @@ -131,12 +138,12 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { @Override public KeyValueIterator<K, V> range(K from, K to) { - return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator()); + return new MemoryStoreIterator<>(this.map.subMap(from, true, to, false).entrySet().iterator()); } @Override public KeyValueIterator<K, V> all() { - return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator()); + return new MemoryStoreIterator<>(this.map.entrySet().iterator()); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index 76dd744..d410e02 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -29,6 +29,19 @@ import java.util.List; import java.util.Map; import java.util.Set; +/** + * An in-memory LRU cache store based on HashSet and HashMap. + * + * * Note that the use of array-typed keys is discouraged because they result in incorrect ordering behavior. + * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class, + * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}. + + * + * @param <K> The key type + * @param <V> The value type + * + * @see org.apache.kafka.streams.state.Stores#create(String) + */ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { public interface EldestEntryRemovalListener<K, V> { http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 9808c04..5e5b54a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -137,9 +137,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { public V delete(K key) { long startNs = time.nanoseconds(); try { - V value = this.inner.delete(key); - - return value; + return this.inner.delete(key); } finally { this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds()); } @@ -147,12 +145,12 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { @Override public KeyValueIterator<K, V> range(K from, K to) { - return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime); + return new MeteredKeyValueIterator<>(this.inner.range(from, to), this.rangeTime); } @Override public KeyValueIterator<K, V> all() { - return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime); + return new MeteredKeyValueIterator<>(this.inner.all(), this.allTime); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java deleted file mode 100644 index 4d99b59..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.WindowStoreUtils; - -import java.util.Comparator; -import java.util.TreeSet; - -public class RawStoreChangeLogger extends StoreChangeLogger<byte[], byte[]> { - - private class ByteArrayComparator implements Comparator<byte[]> { - @Override - public int compare(byte[] left, byte[] right) { - for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) { - int a = left[i] & 0xff; - int b = right[j] & 0xff; - - if (a != b) - return a - b; - } - return left.length - right.length; - } - } - - public RawStoreChangeLogger(String storeName, ProcessorContext context) { - this(storeName, context, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE); - } - - public RawStoreChangeLogger(String storeName, ProcessorContext context, int maxDirty, int maxRemoved) { - super(storeName, context, context.taskId().partition, WindowStoreUtils.INNER_SERDES, maxDirty, maxRemoved); - init(); - } - - @Override - public void init() { - this.dirty = new TreeSet<>(new ByteArrayComparator()); - this.removed = new TreeSet<>(new ByteArrayComparator()); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 944d408..3fef0ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; @@ -27,6 +28,7 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.WindowStoreUtils; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.CompactionStyle; import org.rocksdb.CompressionType; @@ -46,6 +48,18 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Set; +/** + * A persistent key-value store based on RocksDB. + * + * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior. + * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class, + * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}. + * + * @param <K> The key type + * @param <V> The value type + * + * @see org.apache.kafka.streams.state.Stores#create(String) + */ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { private static final int TTL_NOT_USED = -1; @@ -80,8 +94,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { private Set<K> cacheDirtyKeys; private MemoryLRUCache<K, RocksDBCacheEntry> cache; - private StoreChangeLogger<byte[], byte[]> changeLogger; - private StoreChangeLogger.ValueGetter<byte[], byte[]> getter; + private StoreChangeLogger<Bytes, byte[]> changeLogger; + private StoreChangeLogger.ValueGetter<Bytes, byte[]> getter; public KeyValueStore<K, V> enableLogging() { loggingEnabled = true; @@ -156,7 +170,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { // open the DB dir openDB(context); - this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null; + this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, WindowStoreUtils.INNER_SERDES) : null; if (this.cacheSize > 0) { this.cache = new MemoryLRUCache<K, RocksDBCacheEntry>(name, cacheSize) @@ -170,7 +184,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } }); - this.cacheDirtyKeys = new HashSet<>(); } else { this.cache = null; @@ -179,10 +192,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { // value getter should always read directly from rocksDB // since it is only for values that are already flushed - this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() { + this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() { @Override - public byte[] get(byte[] key) { - return getInternal(key); + public byte[] get(Bytes key) { + return getInternal(key.get()); } }; @@ -258,7 +271,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { putInternal(rawKey, rawValue); if (loggingEnabled) { - changeLogger.add(rawKey); + changeLogger.add(Bytes.wrap(rawKey)); changeLogger.maybeLogChange(this.getter); } } @@ -325,7 +338,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { if (cache != null) flushCache(); - return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to); + return new RocksDBRangeIterator<>(db.newIterator(), serdes, from, to); } @Override @@ -336,7 +349,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { RocksIterator innerIter = db.newIterator(); innerIter.seekToFirst(); - return new RocksDbIterator<K, V>(innerIter, serdes); + return new RocksDbIterator<>(innerIter, serdes); } private void flushCache() { @@ -348,14 +361,16 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { for (K key : cacheDirtyKeys) { RocksDBCacheEntry entry = cache.get(key); - entry.isDirty = false; + if (entry != null) { + entry.isDirty = false; - byte[] rawKey = serdes.rawKey(key); + byte[] rawKey = serdes.rawKey(key); - if (entry.value != null) { - putBatch.add(new KeyValue<>(rawKey, serdes.rawValue(entry.value))); - } else { - deleteBatch.add(rawKey); + if (entry.value != null) { + putBatch.add(new KeyValue<>(rawKey, serdes.rawValue(entry.value))); + } else { + deleteBatch.add(rawKey); + } } } @@ -363,7 +378,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { if (loggingEnabled) { for (KeyValue<byte[], byte[]> kv : putBatch) - changeLogger.add(kv.key); + changeLogger.add(Bytes.wrap(kv.key)); } // check all removed entries and remove them in rocksDB @@ -376,7 +391,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } if (loggingEnabled) { - changeLogger.delete(removedKey); + changeLogger.delete(Bytes.wrap(removedKey)); } } @@ -464,30 +479,14 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } - private static class LexicographicComparator implements Comparator<byte[]> { - - @Override - public int compare(byte[] left, byte[] right) { - for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) { - int leftByte = left[i] & 0xff; - int rightByte = right[j] & 0xff; - if (leftByte != rightByte) { - return leftByte - rightByte; - } - } - return left.length - right.length; - } - } - private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> { // RocksDB's JNI interface does not expose getters/setters that allow the // comparator to be pluggable, and the default is lexicographic, so it's // safe to just force lexicographic comparator here for now. - private final Comparator<byte[]> comparator = new LexicographicComparator(); - byte[] rawToKey; + private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR; + private byte[] rawToKey; - public RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> serdes, - K from, K to) { + public RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> serdes, K from, K to) { super(iter, serdes); iter.seek(serdes.rawKey(from)); this.rawToKey = serdes.rawKey(to); http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 5955d21..4c964c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; @@ -31,7 +32,6 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.WindowStoreUtils; - import java.io.File; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -48,11 +48,12 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { private static final long USE_CURRENT_TIMESTAMP = -1L; - private static class Segment extends RocksDBStore<byte[], byte[]> { + // use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures + private static class Segment extends RocksDBStore<Bytes, byte[]> { public final long id; Segment(String segmentName, String windowName, long id) { - super(segmentName, windowName, WindowStoreUtils.INNER_SERDE, WindowStoreUtils.INNER_SERDE); + super(segmentName, windowName, WindowStoreUtils.INNER_KEY_SERDE, WindowStoreUtils.INNER_VALUE_SERDE); this.id = id; } @@ -63,14 +64,14 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> { private final StateSerdes<?, V> serdes; - private final KeyValueIterator<byte[], byte[]>[] iterators; + private final KeyValueIterator<Bytes, byte[]>[] iterators; private int index = 0; RocksDBWindowStoreIterator(StateSerdes<?, V> serdes) { this(serdes, WindowStoreUtils.NO_ITERATORS); } - RocksDBWindowStoreIterator(StateSerdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) { + RocksDBWindowStoreIterator(StateSerdes<?, V> serdes, KeyValueIterator<Bytes, byte[]>[] iterators) { this.serdes = serdes; this.iterators = iterators; } @@ -94,9 +95,9 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { if (index >= iterators.length) throw new NoSuchElementException(); - KeyValue<byte[], byte[]> kv = iterators[index].next(); + KeyValue<Bytes, byte[]> kv = iterators[index].next(); - return new KeyValue<>(WindowStoreUtils.timestampFromBinaryKey(kv.key), + return new KeyValue<>(WindowStoreUtils.timestampFromBinaryKey(kv.key.get()), serdes.valueFrom(kv.value)); } @@ -108,7 +109,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { @Override public void close() { - for (KeyValueIterator<byte[], byte[]> iterator : iterators) { + for (KeyValueIterator<Bytes, byte[]> iterator : iterators) { iterator.close(); } } @@ -121,7 +122,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { private final Serde<K> keySerde; private final Serde<V> valueSerde; private final SimpleDateFormat formatter; - private final StoreChangeLogger.ValueGetter<byte[], byte[]> getter; + private final StoreChangeLogger.ValueGetter<Bytes, byte[]> getter; private ProcessorContext context; private int seqnum = 0; @@ -130,7 +131,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { private StateSerdes<K, V> serdes; private boolean loggingEnabled = false; - private StoreChangeLogger<byte[], byte[]> changeLogger = null; + private StoreChangeLogger<Bytes, byte[]> changeLogger = null; public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde) { this.name = name; @@ -144,9 +145,9 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { this.retainDuplicates = retainDuplicates; - this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() { - public byte[] get(byte[] key) { - return getInternal(key); + this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() { + public byte[] get(Bytes key) { + return getInternal(key.get()); } }; @@ -178,13 +179,16 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { openExistingSegments(); - this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null; + this.changeLogger = this.loggingEnabled ? new StoreChangeLogger(name, context, WindowStoreUtils.INNER_SERDES) : null; // register and possibly restore the state from the logs context.register(root, loggingEnabled, new StateRestoreCallback() { @Override public void restore(byte[] key, byte[] value) { - putInternal(key, value); + // if the value is null, it means that this record has already been + // deleted while it was captured in the changelog, hence we do not need to put any more. + if (value != null) + putInternal(key, value); } }); @@ -249,7 +253,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { byte[] rawKey = putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP); if (rawKey != null && loggingEnabled) { - changeLogger.add(rawKey); + changeLogger.add(Bytes.wrap(rawKey)); changeLogger.maybeLogChange(this.getter); } } @@ -259,7 +263,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { byte[] rawKey = putAndReturnInternalKey(key, value, timestamp); if (rawKey != null && loggingEnabled) { - changeLogger.add(rawKey); + changeLogger.add(Bytes.wrap(rawKey)); changeLogger.maybeLogChange(this.getter); } } @@ -281,7 +285,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { if (retainDuplicates) seqnum = (seqnum + 1) & 0x7FFFFFFF; byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes); - segment.put(binaryKey, serdes.rawValue(value)); + segment.put(Bytes.wrap(binaryKey), serdes.rawValue(value)); return binaryKey; } else { return null; @@ -300,7 +304,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { // If the record is within the retention period, put it in the store. Segment segment = getSegment(segmentId); if (segment != null) - segment.put(binaryKey, binaryValue); + segment.put(Bytes.wrap(binaryKey), binaryValue); } private byte[] getInternal(byte[] binaryKey) { @@ -308,7 +312,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { Segment segment = getSegment(segmentId); if (segment != null) { - return segment.get(binaryKey); + return segment.get(Bytes.wrap(binaryKey)); } else { return null; } @@ -323,12 +327,12 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes); byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, Integer.MAX_VALUE, serdes); - ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>(); + ArrayList<KeyValueIterator<Bytes, byte[]>> iterators = new ArrayList<>(); for (long segmentId = segFrom; segmentId <= segTo; segmentId++) { Segment segment = getSegment(segmentId); if (segment != null) - iterators.add(segment.range(binaryFrom, binaryTo)); + iterators.add(segment.range(Bytes.wrap(binaryFrom), Bytes.wrap(binaryTo))); } if (iterators.size() > 0) { http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java index a439117..3f848fe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java @@ -27,6 +27,16 @@ import org.apache.kafka.streams.state.StateSerdes; import java.util.HashSet; import java.util.Set; +/** + * Store change log collector that batches updates before sending to Kafka. + * + * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior. + * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class, + * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}. + * + * @param <K> + * @param <V> + */ public class StoreChangeLogger<K, V> { public interface ValueGetter<K, V> { http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 0468f49..3a35d75 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -198,19 +198,8 @@ public class KeyValueStoreTestDriver<K, V> { public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { // for byte arrays we need to wrap it for comparison - K key; - if (record.key() instanceof byte[]) { - key = serdes.keyFrom((byte[]) record.key()); - } else { - key = (K) record.key(); - } - - V value; - if (record.key() instanceof byte[]) { - value = serdes.valueFrom((byte[]) record.value()); - } else { - value = (V) record.value(); - } + K key = serdes.keyFrom(keySerializer.serialize(record.topic(), record.key())); + V value = serdes.valueFrom(valueSerializer.serialize(record.topic(), record.value())); recordFlushed(key, value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index fb0efc9..2bfe644 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -192,8 +192,6 @@ public abstract class AbstractKeyValueStoreTest { } } - - @Test public void testPutIfAbsent() { // Create the test driver ... http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java index 9a477df..09f12fb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java @@ -24,10 +24,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.RecordCollector; @@ -63,8 +60,6 @@ public class StoreChangeLoggerTest { private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), 3, 3); - private final StoreChangeLogger<byte[], byte[]> rawChangeLogger = new RawStoreChangeLogger(topic, context, 3, 3); - private final StoreChangeLogger.ValueGetter<Integer, String> getter = new StoreChangeLogger.ValueGetter<Integer, String>() { @Override public String get(Integer key) { @@ -72,16 +67,6 @@ public class StoreChangeLoggerTest { } }; - private final StoreChangeLogger.ValueGetter<byte[], byte[]> rawGetter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() { - private IntegerDeserializer deserializer = new IntegerDeserializer(); - private StringSerializer serializer = new StringSerializer(); - - @Override - public byte[] get(byte[] key) { - return serializer.serialize(topic, written.get(deserializer.deserialize(topic, key))); - } - }; - @Test public void testAddRemove() { written.put(0, "zero"); @@ -117,30 +102,4 @@ public class StoreChangeLoggerTest { assertEquals("three", logged.get(3)); assertEquals("four", logged.get(4)); } - - @Test - public void testRaw() { - IntegerSerializer serializer = new IntegerSerializer(); - - written.put(0, "zero"); - rawChangeLogger.add(serializer.serialize(topic, 0)); - written.put(1, "one"); - rawChangeLogger.add(serializer.serialize(topic, 1)); - written.put(2, "two"); - rawChangeLogger.add(serializer.serialize(topic, 2)); - assertEquals(3, rawChangeLogger.numDirty()); - assertEquals(0, rawChangeLogger.numRemoved()); - - rawChangeLogger.delete(serializer.serialize(topic, 0)); - rawChangeLogger.delete(serializer.serialize(topic, 1)); - written.put(3, "three"); - rawChangeLogger.add(serializer.serialize(topic, 3)); - assertEquals(2, rawChangeLogger.numDirty()); - assertEquals(2, rawChangeLogger.numRemoved()); - - written.put(0, "zero-again"); - rawChangeLogger.add(serializer.serialize(topic, 0)); - assertEquals(3, rawChangeLogger.numDirty()); - assertEquals(1, rawChangeLogger.numRemoved()); - } }
