Repository: kafka
Updated Branches:
  refs/heads/1.0 42841c9f3 -> b113b4bd3


KAFKA-5576: RocksDB upgrade to 5.8, plus one bug fix on Bytes.wrap

Author: Guozhang Wang <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Bill Bejeck <[email protected]>, 
Matthias J. Sax <[email protected]>, Damian Guy <[email protected]>

Closes #3819 from guozhangwang/KMinor-rocksDB-573

(cherry picked from commit 196bcfca0c56420793f85514d1602bde564b0651)
Signed-off-by: Guozhang Wang <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b113b4bd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b113b4bd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b113b4bd

Branch: refs/heads/1.0
Commit: b113b4bd3e3802e8e6974e6754e96bd0f1de32e2
Parents: 42841c9
Author: Guozhang Wang <[email protected]>
Authored: Thu Oct 5 17:02:53 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Thu Oct 5 17:03:01 2017 -0700

----------------------------------------------------------------------
 clients/src/main/java/org/apache/kafka/common/utils/Bytes.java | 2 ++
 gradle/dependencies.gradle                                     | 2 +-
 .../streams/state/internals/ChangeLoggingWindowBytesStore.java | 6 ++----
 .../org/apache/kafka/streams/state/internals/RocksDBStore.java | 6 +-----
 .../streams/state/internals/AbstractKeyValueStoreTest.java     | 1 +
 5 files changed, 7 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
index 3044020..e531d1f 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
@@ -33,6 +33,8 @@ public class Bytes implements Comparable<Bytes> {
     private int hashCode;
 
     public static Bytes wrap(byte[] bytes) {
+        if (bytes == null)
+            return null;
         return new Bytes(bytes);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index a7c99eb..c9983ce 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -64,7 +64,7 @@ versions += [
   // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
   powermock: "2.0.0-beta.5",
   reflections: "0.9.11",
-  rocksDB: "5.3.6",
+  rocksDB: "5.8.0",
   scalatest: "3.0.4",
   scoverage: "1.3.1",
   slf4j: "1.7.25",

http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index da99d55..0035019 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -64,10 +64,8 @@ class ChangeLoggingWindowBytesStore extends 
WrappedStateStore.AbstractStateStore
 
     @Override
     public void put(final Bytes key, final byte[] value, final long timestamp) 
{
-        if (key != null) {
-            bytesStore.put(key, value, timestamp);
-            changeLogger.logChange(WindowStoreUtils.toBinaryKey(key, 
timestamp, maybeUpdateSeqnumForDups(), innerStateSerde), value);
-        }
+        bytesStore.put(key, value, timestamp);
+        changeLogger.logChange(WindowStoreUtils.toBinaryKey(key, timestamp, 
maybeUpdateSeqnumForDups(), innerStateSerde), value);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index d8a844c..c219314 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -269,11 +269,6 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
                 } catch (RocksDBException e) {
                     throw new ProcessorStateException("Error while range 
compacting during restoring  store " + this.name, e);
                 }
-
-                // we need to re-open with the old num.levels again, this is a 
workaround
-                // until https://github.com/facebook/rocksdb/pull/2740 is 
merged in rocksdb
-                close();
-                openDB(internalProcessorContext);
             }
         }
 
@@ -372,6 +367,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
         // query rocksdb
         final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(name, db.newIterator(), serdes, from, to);
         openIterators.add(rocksDBRangeIterator);
+
         return rocksDBRangeIterator;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b113b4bd/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index af917e6..65a9dec 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -162,6 +162,7 @@ public abstract class AbstractKeyValueStoreTest {
         // receive the restore entries ...
         store = createKeyValueStore(driver.context(), Integer.class, 
String.class, false);
         context.restore(store.name(), driver.restoredEntries());
+
         // Verify that the store's contents were properly restored ...
         assertEquals(0, driver.checkForRestoredEntries(store));
 

Reply via email to