This is an automated email from the ASF dual-hosted git repository.
saniljain15 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 3792d71ae SAMZA-2744 Update RocksDb version (#1609)
3792d71ae is described below
commit 3792d71ae5b0dac80dc233893d1ac5e9e83e073d
Author: Daniel Chen <[email protected]>
AuthorDate: Wed May 25 11:45:38 2022 -0700
SAMZA-2744 Update RocksDb version (#1609)
Updated the rocksDB version to be more up to date
6.6.4 -> 7.0.3
---
gradle/dependency-versions.gradle | 2 +-
.../apache/samza/storage/kv/RocksDbKeyValueStore.scala | 16 +++++++++++-----
.../samza/storage/kv/TestRocksDbKeyValueStore.scala | 6 +++---
3 files changed, 15 insertions(+), 9 deletions(-)
diff --git a/gradle/dependency-versions.gradle
b/gradle/dependency-versions.gradle
index 21d7f2329..79f30f83a 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -42,7 +42,7 @@
metricsVersion = "2.2.0"
mockitoVersion = "1.10.19"
powerMockVersion = "1.6.6"
- rocksdbVersion = "6.6.4"
+ rocksdbVersion = "7.0.3"
scalaTestVersion = "3.0.1"
slf4jVersion = "1.7.7"
yarnVersion = "2.10.1"
diff --git
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 6f5a7f2f8..fa093da16 100644
---
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -31,7 +31,9 @@ import org.apache.samza.checkpoint.CheckpointId
import org.apache.samza.config.Config
import org.apache.samza.storage.StorageManagerUtil
import org.apache.samza.util.{FileUtil, Logging}
-import org.rocksdb.{TtlDB, _}
+import org.rocksdb.{Checkpoint, FlushOptions, Options, ReadOptions, RocksDB,
RocksDBException, RocksIterator, TtlDB, WriteBatch, WriteOptions}
+
+import java.util
object RocksDbKeyValueStore extends Logging {
@@ -151,10 +153,10 @@ class RocksDbKeyValueStore(
override def getAll(keys: java.util.List[Array[Byte]]):
java.util.Map[Array[Byte], Array[Byte]] = ifOpen {
metrics.getAlls.inc
require(keys != null, "Null keys not allowed.")
- val map = db.multiGet(keys)
- if (map != null) {
+ val values = db.multiGetAsList(keys)
+ if (values != null) {
var bytesRead = 0L
- val iterator = map.values().iterator
+ val iterator = values.iterator
while (iterator.hasNext) {
val value = iterator.next
if (value != null) {
@@ -163,6 +165,10 @@ class RocksDbKeyValueStore(
}
metrics.bytesRead.inc(bytesRead)
}
+ val map = new util.HashMap[Array[Byte], Array[Byte]]
+ for (i <- 0 until keys.size()) {
+ map.put(keys.get(i), values.get(i))
+ }
map
}
@@ -188,7 +194,7 @@ class RocksDbKeyValueStore(
val curr = iter.next()
if (curr.getValue == null) {
deletes += 1
- writeBatch.remove(curr.getKey)
+ writeBatch.delete(curr.getKey)
} else {
wrote += 1
val key = curr.getKey
diff --git
a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
index e6e1c6229..d0f37be1b 100644
---
a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
+++
b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
@@ -176,10 +176,10 @@ class TestRocksDbKeyValueStore
while (iter.isValid) {
iter.next()
}
- iter.dispose()
+ iter.close()
lock.synchronized {
- rocksDB.remove(key)
+ rocksDB.delete(key)
iter = rocksDB.newIterator()
iter.seek(key)
}
@@ -187,7 +187,7 @@ class TestRocksDbKeyValueStore
while (iter.isValid) {
iter.next()
}
- iter.dispose()
+ iter.close()
val dbDir = new File(System.getProperty("java.io.tmpdir")).toString
val rocksDBReadOnly = RocksDB.openReadOnly(options, dbDir)