Repository: samza Updated Branches: refs/heads/master 6a75503d7 -> 84d144c51
SAMZA-2018: State restore improvements using RocksDB writebatch API This PR enables the RocksDbKeyValueStore to use the writeBatch API. Author: Ray Matharu <[email protected]> Reviewers: Jacob Maes <[email protected]>, Prateek Maheshwari <[email protected]> Closes #864 from rmatharu/writebatch Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/84d144c5 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/84d144c5 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/84d144c5 Branch: refs/heads/master Commit: 84d144c5120bf8bae8dc02ba7a1de1c68bed6418 Parents: 6a75503 Author: Ray Matharu <[email protected]> Authored: Tue Dec 18 13:06:04 2018 -0800 Committer: Prateek Maheshwari <[email protected]> Committed: Tue Dec 18 13:06:04 2018 -0800 ---------------------------------------------------------------------- .../org/apache/samza/storage/kv/RocksDbKeyValueStore.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/84d144c5/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index b7baede..c5a89d9 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -172,25 +172,27 @@ class RocksDbKeyValueStore( } } - // Write batch from RocksDB API is not used currently because of: https://github.com/facebook/rocksdb/issues/262 def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]): Unit = ifOpen { metrics.putAlls.inc() val iter = entries.iterator var wrote = 0 var deletes = 0 + val writeBatch = new WriteBatch() while (iter.hasNext) { val curr = iter.next() if (curr.getValue == null) { deletes += 1 - db.delete(writeOptions, curr.getKey) + writeBatch.remove(curr.getKey) } else { wrote += 1 val key = curr.getKey val value = curr.getValue metrics.bytesWritten.inc(key.length + value.length) - db.put(writeOptions, key, value) + writeBatch.put(key, value) } } + db.write(writeOptions, writeBatch) + writeBatch.close() metrics.puts.inc(wrote) metrics.deletes.inc(deletes) }
