This is an automated email from the ASF dual-hosted git repository.

saniljain15 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 3792d71ae SAMZA-2744 Update RocksDb version (#1609)
3792d71ae is described below

commit 3792d71ae5b0dac80dc233893d1ac5e9e83e073d
Author: Daniel Chen <[email protected]>
AuthorDate: Wed May 25 11:45:38 2022 -0700

    SAMZA-2744 Update RocksDb version (#1609)
    
    Updated the rocksDB version to be more up to date
    
    6.6.4 -> 7.0.3
---
 gradle/dependency-versions.gradle                        |  2 +-
 .../apache/samza/storage/kv/RocksDbKeyValueStore.scala   | 16 +++++++++++-----
 .../samza/storage/kv/TestRocksDbKeyValueStore.scala      |  6 +++---
 3 files changed, 15 insertions(+), 9 deletions(-)

diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index 21d7f2329..79f30f83a 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -42,7 +42,7 @@
   metricsVersion = "2.2.0"
   mockitoVersion = "1.10.19"
   powerMockVersion = "1.6.6"
-  rocksdbVersion = "6.6.4"
+  rocksdbVersion = "7.0.3"
   scalaTestVersion = "3.0.1"
   slf4jVersion = "1.7.7"
   yarnVersion = "2.10.1"
diff --git 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 6f5a7f2f8..fa093da16 100644
--- 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -31,7 +31,9 @@ import org.apache.samza.checkpoint.CheckpointId
 import org.apache.samza.config.Config
 import org.apache.samza.storage.StorageManagerUtil
 import org.apache.samza.util.{FileUtil, Logging}
-import org.rocksdb.{TtlDB, _}
+import org.rocksdb.{Checkpoint, FlushOptions, Options, ReadOptions, RocksDB, 
RocksDBException, RocksIterator, TtlDB, WriteBatch, WriteOptions}
+
+import java.util
 
 object RocksDbKeyValueStore extends Logging {
 
@@ -151,10 +153,10 @@ class RocksDbKeyValueStore(
   override def getAll(keys: java.util.List[Array[Byte]]): 
java.util.Map[Array[Byte], Array[Byte]] = ifOpen {
     metrics.getAlls.inc
     require(keys != null, "Null keys not allowed.")
-    val map = db.multiGet(keys)
-    if (map != null) {
+    val values = db.multiGetAsList(keys)
+    if (values != null) {
       var bytesRead = 0L
-      val iterator = map.values().iterator
+      val iterator = values.iterator
       while (iterator.hasNext) {
         val value = iterator.next
         if (value != null) {
@@ -163,6 +165,10 @@ class RocksDbKeyValueStore(
       }
       metrics.bytesRead.inc(bytesRead)
     }
+    val map = new util.HashMap[Array[Byte], Array[Byte]]
+    for (i <- 0 until keys.size()) {
+      map.put(keys.get(i), values.get(i))
+    }
     map
   }
 
@@ -188,7 +194,7 @@ class RocksDbKeyValueStore(
       val curr = iter.next()
       if (curr.getValue == null) {
         deletes += 1
-        writeBatch.remove(curr.getKey)
+        writeBatch.delete(curr.getKey)
       } else {
         wrote += 1
         val key = curr.getKey
diff --git 
a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
 
b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
index e6e1c6229..d0f37be1b 100644
--- 
a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
+++ 
b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
@@ -176,10 +176,10 @@ class TestRocksDbKeyValueStore
     while (iter.isValid) {
       iter.next()
     }
-    iter.dispose()
+    iter.close()
 
     lock.synchronized {
-      rocksDB.remove(key)
+      rocksDB.delete(key)
       iter = rocksDB.newIterator()
       iter.seek(key)
     }
@@ -187,7 +187,7 @@ class TestRocksDbKeyValueStore
     while (iter.isValid) {
       iter.next()
     }
-    iter.dispose()
+    iter.close()
 
     val dbDir = new File(System.getProperty("java.io.tmpdir")).toString
     val rocksDBReadOnly = RocksDB.openReadOnly(options, dbDir)

Reply via email to