Repository: kafka Updated Branches: refs/heads/1.0 42841c9f3 -> b113b4bd3
KAFKA-5576: RocksDB upgrade to 5.8, plus one bug fix on Bytes.wrap Author: Guozhang Wang <[email protected]> Reviewers: Ismael Juma <[email protected]>, Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>, Damian Guy <[email protected]> Closes #3819 from guozhangwang/KMinor-rocksDB-573 (cherry picked from commit 196bcfca0c56420793f85514d1602bde564b0651) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b113b4bd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b113b4bd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b113b4bd Branch: refs/heads/1.0 Commit: b113b4bd3e3802e8e6974e6754e96bd0f1de32e2 Parents: 42841c9 Author: Guozhang Wang <[email protected]> Authored: Thu Oct 5 17:02:53 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Oct 5 17:03:01 2017 -0700 ---------------------------------------------------------------------- clients/src/main/java/org/apache/kafka/common/utils/Bytes.java | 2 ++ gradle/dependencies.gradle | 2 +- .../streams/state/internals/ChangeLoggingWindowBytesStore.java | 6 ++---- .../org/apache/kafka/streams/state/internals/RocksDBStore.java | 6 +----- .../streams/state/internals/AbstractKeyValueStoreTest.java | 1 + 5 files changed, 7 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java index 3044020..e531d1f 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java @@ -33,6 +33,8 @@ public class Bytes implements Comparable<Bytes> { private int hashCode; public static Bytes wrap(byte[] bytes) { + if (bytes == null) + return null; return new Bytes(bytes); } http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/gradle/dependencies.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index a7c99eb..c9983ce 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -64,7 +64,7 @@ versions += [ // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta powermock: "2.0.0-beta.5", reflections: "0.9.11", - rocksDB: "5.3.6", + rocksDB: "5.8.0", scalatest: "3.0.4", scoverage: "1.3.1", slf4j: "1.7.25", http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index da99d55..0035019 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -64,10 +64,8 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore @Override public void put(final Bytes key, final byte[] value, final long timestamp) { - if (key != null) { - bytesStore.put(key, value, timestamp); - changeLogger.logChange(WindowStoreUtils.toBinaryKey(key, timestamp, maybeUpdateSeqnumForDups(), innerStateSerde), value); - } + bytesStore.put(key, value, timestamp); + changeLogger.logChange(WindowStoreUtils.toBinaryKey(key, timestamp, maybeUpdateSeqnumForDups(), innerStateSerde), value); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index d8a844c..c219314 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -269,11 +269,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } catch (RocksDBException e) { throw new ProcessorStateException("Error while range compacting during restoring store " + this.name, e); } - - // we need to re-open with the old num.levels again, this is a workaround - // until https://github.com/facebook/rocksdb/pull/2740 is merged in rocksdb - close(); - openDB(internalProcessorContext); } } @@ -372,6 +367,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { // query rocksdb final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(name, db.newIterator(), serdes, from, to); openIterators.add(rocksDBRangeIterator); + return rocksDBRangeIterator; } http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index af917e6..65a9dec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -162,6 +162,7 @@ public abstract class AbstractKeyValueStoreTest { // receive the restore entries ... store = createKeyValueStore(driver.context(), Integer.class, String.class, false); context.restore(store.name(), driver.restoredEntries()); + // Verify that the store's contents were properly restored ... assertEquals(0, driver.checkForRestoredEntries(store));
