This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 8e6c1a2 DbLedgerStorage -- KeyValue storage interface with RocksDB
implementation
8e6c1a2 is described below
commit 8e6c1a2e76a77b0dcf503e25a90a12d2d7b784ab
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Dec 13 16:56:34 2017 +0800
DbLedgerStorage -- KeyValue storage interface with RocksDB implementation
Add an interface for a generic key-value storage local library, with the
default implementation based on RocksDB.
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie
Guo <[email protected]>
This closes #829 from merlimat/key-value-storage
---
bookkeeper-server/pom.xml | 5 +
.../bookkeeper/bookie/storage/ldb/ArrayUtil.java | 72 ++++
.../bookie/storage/ldb/KeyValueStorage.java | 162 ++++++++
.../bookie/storage/ldb/KeyValueStorageFactory.java | 42 ++
.../bookie/storage/ldb/KeyValueStorageRocksDB.java | 447 +++++++++++++++++++++
.../bookie/storage/ldb/KeyValueStorageTest.java | 175 ++++++++
pom.xml | 1 +
7 files changed, 904 insertions(+)
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index b7b33eb..8c057e6 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -40,6 +40,11 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ <version>${rocksdb.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java
new file mode 100644
index 0000000..67e98f2
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+// CHECKSTYLE.OFF: IllegalImport
+import io.netty.util.internal.PlatformDependent;
+// CHECKSTYLE.ON: IllegalImport
+
+import java.nio.ByteOrder;
+
+/**
+ * Utility to serialize/deserialize longs into byte arrays.
+ */
+class ArrayUtil {
+
+ private static final boolean UNALIGNED = PlatformDependent.isUnaligned();
+ private static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
+ private static final boolean BIG_ENDIAN_NATIVE_ORDER =
ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
+
+ public static long getLong(byte[] array, int index) {
+ if (HAS_UNSAFE && UNALIGNED) {
+ long v = PlatformDependent.getLong(array, index);
+ return BIG_ENDIAN_NATIVE_ORDER ? v : Long.reverseBytes(v);
+ }
+
+ return ((long) array[index] & 0xff) << 56 | //
+ ((long) array[index + 1] & 0xff) << 48 | //
+ ((long) array[index + 2] & 0xff) << 40 | //
+ ((long) array[index + 3] & 0xff) << 32 | //
+ ((long) array[index + 4] & 0xff) << 24 | //
+ ((long) array[index + 5] & 0xff) << 16 | //
+ ((long) array[index + 6] & 0xff) << 8 | //
+ (long) array[index + 7] & 0xff;
+ }
+
+ public static void setLong(byte[] array, int index, long value) {
+ if (HAS_UNSAFE && UNALIGNED) {
+ PlatformDependent.putLong(array, index, BIG_ENDIAN_NATIVE_ORDER ?
value : Long.reverseBytes(value));
+ } else {
+ array[index] = (byte) (value >>> 56);
+ array[index + 1] = (byte) (value >>> 48);
+ array[index + 2] = (byte) (value >>> 40);
+ array[index + 3] = (byte) (value >>> 32);
+ array[index + 4] = (byte) (value >>> 24);
+ array[index + 5] = (byte) (value >>> 16);
+ array[index + 6] = (byte) (value >>> 8);
+ array[index + 7] = (byte) value;
+ }
+ }
+
+ public static final boolean isArrayAllZeros(final byte[] array) {
+ return PlatformDependent.isZero(array, 0, array.length);
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
new file mode 100644
index 0000000..b9bbb2a
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
@@ -0,0 +1,162 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+/**
+ * Abstraction of a generic key-value local database.
+ */
+public interface KeyValueStorage extends Closeable {
+
+ void put(byte[] key, byte[] value) throws IOException;
+
+ /**
+ * Get the value associated with the given key.
+ *
+ * @param key
+ * the key to lookup
+ * @return the value or null if the key was not found
+ */
+ byte[] get(byte[] key) throws IOException;
+
+ /**
+ * Get the value associated with the given key.
+ *
+ * <p>This method will use the provided array store the value
+ *
+ * @param key
+ * the key to lookup
+ * @param value
+ * an array where to store the result
+ * @return -1 if the entry was not found or the length of the value
+ * @throws IOException
+ * if the value array could not hold the result
+ */
+ int get(byte[] key, byte[] value) throws IOException;
+
+ /**
+ * Get the entry whose key is the biggest and it's lesser than the
supplied key.
+ *
+ * <p>For example if the db contains :
+ *
+ * <pre>
+ * {
+ * 1 : 'a',
+ * 2 : 'b',
+ * 3 : 'c'
+ * }
+ * </pre>
+ *
+ * <p>Then:
+ *
+ * <pre>
+ * getFloor(3) --> (2, 'b')
+ * </pre>
+ *
+ * @param key
+ * the non-inclusive upper limit key
+ * @return the entry before or null if there's no entry before key
+ */
+ Entry<byte[], byte[]> getFloor(byte[] key) throws IOException;
+
+ /**
+ * Get the entry whose key is bigger or equal the supplied key.
+ *
+ * @param key
+ * @return
+ * @throws IOException
+ */
+ Entry<byte[], byte[]> getCeil(byte[] key) throws IOException;
+
+ /**
+ *
+ * @param key
+ * @throws IOException
+ */
+ void delete(byte[] key) throws IOException;
+
+ /**
+ * Get an iterator over to scan sequentially through all the keys in the
+ * database.
+ *
+ * @return
+ */
+ CloseableIterator<byte[]> keys();
+
+ /**
+ * Get an iterator over to scan sequentially through all the keys within a
+ * specified range.
+ *
+ * @param firstKey
+ * the first key in the range (included)
+ * @param lastKey
+ * the lastKey in the range (not included)
+ *
+ */
+ CloseableIterator<byte[]> keys(byte[] firstKey, byte[] lastKey);
+
+ /**
+ * Return an iterator object that can be used to sequentially scan through
all
+ * the entries in the database.
+ */
+ CloseableIterator<Entry<byte[], byte[]>> iterator();
+
+ /**
+ * Commit all pending write to durable storage.
+ */
+ void sync() throws IOException;
+
+ /**
+ * @return the number of keys.
+ */
+ long count() throws IOException;
+
+ /**
+ * Iterator interface.
+ *
+ * @param <T>
+ */
+ interface CloseableIterator<T> extends Closeable {
+ boolean hasNext() throws IOException;
+
+ T next() throws IOException;
+ }
+
+ Batch newBatch();
+
+ /**
+ * Interface for a batch to be written in the storage.
+ */
+ public interface Batch extends Closeable {
+ void put(byte[] key, byte[] value);
+
+ void remove(byte[] key);
+
+ void deleteRange(byte[] beginKey, byte[] endKey);
+
+ void clear();
+
+ void flush() throws IOException;
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageFactory.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageFactory.java
new file mode 100644
index 0000000..c35628d
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageFactory.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * Factory class to create instances of the key-value storage implementation.
+ */
+public interface KeyValueStorageFactory {
+
+ /**
+ * Enum used to specify different config profiles in the underlying
storage.
+ */
+ enum DbConfigType {
+ Small, // Used for ledgers db, doesn't need particular configuration
+ Huge // Used for location index, lots of writes and much bigger dataset
+ }
+
+ KeyValueStorage newKeyValueStorage(String path, DbConfigType dbConfigType,
ServerConfiguration conf)
+ throws IOException;
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
new file mode 100644
index 0000000..0978fc0
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
@@ -0,0 +1,447 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.primitives.UnsignedBytes;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.ChecksumType;
+import org.rocksdb.CompressionType;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RocksDB based implementation of the KeyValueStorage.
+ */
+public class KeyValueStorageRocksDB implements KeyValueStorage {
+
+ static KeyValueStorageFactory factory = (path, dbConfigType, conf) -> new
KeyValueStorageRocksDB(path, dbConfigType,
+ conf);
+
+ private final RocksDB db;
+
+ private final WriteOptions optionSync = new WriteOptions();
+ private final WriteOptions optionDontSync = new WriteOptions();
+
+ private final ReadOptions optionCache = new ReadOptions();
+ private final ReadOptions optionDontCache = new ReadOptions();
+
+ private final WriteBatch emptyBatch = new WriteBatch();
+
+ private static final String ROCKSDB_LOG_LEVEL =
"dbStorage_rocksDB_logLevel";
+ private static final String ROCKSDB_WRITE_BUFFER_SIZE_MB =
"dbStorage_rocksDB_writeBufferSizeMB";
+ private static final String ROCKSDB_SST_SIZE_MB =
"dbStorage_rocksDB_sstSizeInMB";
+ private static final String ROCKSDB_BLOCK_SIZE =
"dbStorage_rocksDB_blockSize";
+ private static final String ROCKSDB_BLOOM_FILTERS_BITS_PER_KEY =
"dbStorage_rocksDB_bloomFilterBitsPerKey";
+ private static final String ROCKSDB_BLOCK_CACHE_SIZE =
"dbStorage_rocksDB_blockCacheSize";
+ private static final String ROCKSDB_NUM_LEVELS =
"dbStorage_rocksDB_numLevels";
+ private static final String ROCKSDB_NUM_FILES_IN_LEVEL0 =
"dbStorage_rocksDB_numFilesInLevel0";
+ private static final String ROCKSDB_MAX_SIZE_IN_LEVEL1_MB =
"dbStorage_rocksDB_maxSizeInLevel1MB";
+
+ public KeyValueStorageRocksDB(String path, DbConfigType dbConfigType,
ServerConfiguration conf) throws IOException {
+ this(path, dbConfigType, conf, false);
+ }
+
+ public KeyValueStorageRocksDB(String path, DbConfigType dbConfigType,
ServerConfiguration conf, boolean readOnly)
+ throws IOException {
+ try {
+ RocksDB.loadLibrary();
+ } catch (Throwable t) {
+ throw new IOException("Failed to load RocksDB JNI library", t);
+ }
+
+ try (Options options = new Options()) {
+ options.setCreateIfMissing(true);
+
+ if (dbConfigType == DbConfigType.Huge) {
+ long writeBufferSizeMB =
conf.getInt(ROCKSDB_WRITE_BUFFER_SIZE_MB, 64);
+ long sstSizeMB = conf.getInt(ROCKSDB_SST_SIZE_MB, 64);
+ int numLevels = conf.getInt(ROCKSDB_NUM_LEVELS, -1);
+ int numFilesInLevel0 =
conf.getInt(ROCKSDB_NUM_FILES_IN_LEVEL0, 4);
+ long maxSizeInLevel1MB =
conf.getLong(ROCKSDB_MAX_SIZE_IN_LEVEL1_MB, 256);
+ int blockSize = conf.getInt(ROCKSDB_BLOCK_SIZE, 64 * 1024);
+ long blockCacheSize = conf.getLong(ROCKSDB_BLOCK_CACHE_SIZE,
256 * 1024 * 1024);
+ int bloomFilterBitsPerKey =
conf.getInt(ROCKSDB_BLOOM_FILTERS_BITS_PER_KEY, 10);
+
+ options.setCompressionType(CompressionType.LZ4_COMPRESSION);
+ options.setWriteBufferSize(writeBufferSizeMB * 1024 * 1024);
+ options.setMaxWriteBufferNumber(4);
+ if (numLevels > 0) {
+ options.setNumLevels(numLevels);
+ }
+ options.setLevelZeroFileNumCompactionTrigger(numFilesInLevel0);
+ options.setMaxBytesForLevelBase(maxSizeInLevel1MB * 1024 *
1024);
+ options.setMaxBackgroundCompactions(16);
+ options.setMaxBackgroundFlushes(16);
+ options.setIncreaseParallelism(32);
+ options.setMaxTotalWalSize(512 * 1024 * 1024);
+ options.setMaxOpenFiles(-1);
+ options.setTargetFileSizeBase(sstSizeMB * 1024 * 1024);
+
options.setDeleteObsoleteFilesPeriodMicros(TimeUnit.HOURS.toMicros(1));
+
+ BlockBasedTableConfig tableOptions = new
BlockBasedTableConfig();
+ tableOptions.setBlockSize(blockSize);
+ tableOptions.setBlockCacheSize(blockCacheSize);
+ tableOptions.setFormatVersion(2);
+ tableOptions.setChecksumType(ChecksumType.kxxHash);
+ if (bloomFilterBitsPerKey > 0) {
+ tableOptions.setFilter(new
BloomFilter(bloomFilterBitsPerKey, false));
+ }
+
+ // Options best suited for HDDs
+ tableOptions.setCacheIndexAndFilterBlocks(true);
+ options.setLevelCompactionDynamicLevelBytes(true);
+
+ options.setTableFormatConfig(tableOptions);
+ }
+
+ // Configure log level
+ String logLevel = conf.getString(ROCKSDB_LOG_LEVEL, "info");
+ switch (logLevel) {
+ case "debug":
+ options.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL);
+ break;
+ case "info":
+ options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
+ break;
+ case "warn":
+ options.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
+ break;
+ case "error":
+ options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
+ break;
+ default:
+ log.warn("Unrecognized RockDB log level: {}", logLevel);
+ }
+
+ // Keep log files for 1month
+ options.setKeepLogFileNum(30);
+ options.setLogFileTimeToRoll(TimeUnit.DAYS.toSeconds(1));
+
+ try {
+ if (readOnly) {
+ db = RocksDB.openReadOnly(options, path);
+ } else {
+ db = RocksDB.open(options, path);
+ }
+ } catch (RocksDBException e) {
+ throw new IOException("Error open RocksDB database", e);
+ }
+ }
+
+ optionSync.setSync(true);
+ optionDontSync.setSync(false);
+
+ optionCache.setFillCache(true);
+ optionDontCache.setFillCache(false);
+ }
+
+ @Override
+ public void close() throws IOException {
+ db.close();
+ optionSync.close();
+ optionDontSync.close();
+ optionCache.close();
+ optionDontCache.close();
+ emptyBatch.close();
+ }
+
+ @Override
+ public void put(byte[] key, byte[] value) throws IOException {
+ try {
+ db.put(optionDontSync, key, value);
+ } catch (RocksDBException e) {
+ throw new IOException("Error in RocksDB put", e);
+ }
+ }
+
+ @Override
+ public byte[] get(byte[] key) throws IOException {
+ try {
+ return db.get(key);
+ } catch (RocksDBException e) {
+ throw new IOException("Error in RocksDB get", e);
+ }
+ }
+
+ @Override
+ public int get(byte[] key, byte[] value) throws IOException {
+ try {
+ int res = db.get(key, value);
+ if (res == RocksDB.NOT_FOUND) {
+ return -1;
+ } else if (res > value.length) {
+ throw new IOException("Value array is too small to fit the
result");
+ } else {
+ return res;
+ }
+ } catch (RocksDBException e) {
+ throw new IOException("Error in RocksDB get", e);
+ }
+ }
+
+ @Override
+ public Entry<byte[], byte[]> getFloor(byte[] key) throws IOException {
+ try (RocksIterator iterator = db.newIterator(optionCache)) {
+ // Position the iterator on the record whose key is >= to the
supplied key
+ iterator.seek(key);
+
+ if (!iterator.isValid()) {
+ // There are no entries >= key
+ iterator.seekToLast();
+ if (iterator.isValid()) {
+ return new EntryWrapper(iterator.key(), iterator.value());
+ } else {
+ // Db is empty
+ return null;
+ }
+ }
+
+ iterator.prev();
+
+ if (!iterator.isValid()) {
+ // Iterator is on the 1st entry of the db and this entry key
is >= to the target
+ // key
+ return null;
+ } else {
+ return new EntryWrapper(iterator.key(), iterator.value());
+ }
+ }
+ }
+
+ @Override
+ public Entry<byte[], byte[]> getCeil(byte[] key) throws IOException {
+ try (RocksIterator iterator = db.newIterator(optionCache)) {
+ // Position the iterator on the record whose key is >= to the
supplied key
+ iterator.seek(key);
+
+ if (iterator.isValid()) {
+ return new EntryWrapper(iterator.key(), iterator.value());
+ } else {
+ return null;
+ }
+ }
+ }
+
+ @Override
+ public void delete(byte[] key) throws IOException {
+ try {
+ db.delete(optionDontSync, key);
+ } catch (RocksDBException e) {
+ throw new IOException("Error in RocksDB delete", e);
+ }
+ }
+
+ @Override
+ public void sync() throws IOException {
+ try {
+ db.write(optionSync, emptyBatch);
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public CloseableIterator<byte[]> keys() {
+ final RocksIterator iterator = db.newIterator(optionCache);
+ iterator.seekToFirst();
+
+ return new CloseableIterator<byte[]>() {
+ @Override
+ public boolean hasNext() {
+ return iterator.isValid();
+ }
+
+ @Override
+ public byte[] next() {
+ checkState(iterator.isValid());
+ byte[] key = iterator.key();
+ iterator.next();
+ return key;
+ }
+
+ @Override
+ public void close() {
+ iterator.close();
+ }
+ };
+ }
+
+ @Override
+ public CloseableIterator<byte[]> keys(byte[] firstKey, byte[] lastKey) {
+ final RocksIterator iterator = db.newIterator(optionCache);
+ iterator.seek(firstKey);
+
+ return new CloseableIterator<byte[]>() {
+ @Override
+ public boolean hasNext() {
+ return iterator.isValid() &&
ByteComparator.compare(iterator.key(), lastKey) < 0;
+ }
+
+ @Override
+ public byte[] next() {
+ checkState(iterator.isValid());
+ byte[] key = iterator.key();
+ iterator.next();
+ return key;
+ }
+
+ @Override
+ public void close() {
+ iterator.close();
+ }
+ };
+ }
+
+ @Override
+ public CloseableIterator<Entry<byte[], byte[]>> iterator() {
+ final RocksIterator iterator = db.newIterator(optionDontCache);
+ iterator.seekToFirst();
+ final EntryWrapper entryWrapper = new EntryWrapper();
+
+ return new CloseableIterator<Entry<byte[], byte[]>>() {
+ @Override
+ public boolean hasNext() {
+ return iterator.isValid();
+ }
+
+ @Override
+ public Entry<byte[], byte[]> next() {
+ checkState(iterator.isValid());
+ entryWrapper.key = iterator.key();
+ entryWrapper.value = iterator.value();
+ iterator.next();
+ return entryWrapper;
+ }
+
+ @Override
+ public void close() {
+ iterator.close();
+ }
+ };
+ }
+
+ @Override
+ public long count() throws IOException {
+ try {
+ return db.getLongProperty("rocksdb.estimate-num-keys");
+ } catch (RocksDBException e) {
+ throw new IOException("Error in getting records count", e);
+ }
+ }
+
+ @Override
+ public Batch newBatch() {
+ return new RocksDBBatch();
+ }
+
+ private class RocksDBBatch implements Batch {
+ private final WriteBatch writeBatch = new WriteBatch();
+
+ @Override
+ public void close() {
+ writeBatch.close();
+ }
+
+ @Override
+ public void put(byte[] key, byte[] value) {
+ writeBatch.put(key, value);
+ }
+
+ @Override
+ public void remove(byte[] key) {
+ writeBatch.remove(key);
+ }
+
+ @Override
+ public void clear() {
+ writeBatch.clear();
+ }
+
+ @Override
+ public void deleteRange(byte[] beginKey, byte[] endKey) {
+ writeBatch.deleteRange(beginKey, endKey);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try {
+ db.write(optionSync, writeBatch);
+ } catch (RocksDBException e) {
+ throw new IOException("Failed to flush RocksDB batch", e);
+ }
+ }
+ }
+
+ private static final class EntryWrapper implements Entry<byte[], byte[]> {
+ // This is not final since the iterator will reuse the same
EntryWrapper
+ // instance at each step
+ private byte[] key;
+ private byte[] value;
+
+ public EntryWrapper() {
+ this.key = null;
+ this.value = null;
+ }
+
+ public EntryWrapper(byte[] key, byte[] value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public byte[] setValue(byte[] value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte[] getValue() {
+ return value;
+ }
+
+ @Override
+ public byte[] getKey() {
+ return key;
+ }
+ }
+
+ private static final Comparator<byte[]> ByteComparator =
UnsignedBytes.lexicographicalComparator();
+
+ private static final Logger log =
LoggerFactory.getLogger(KeyValueStorageRocksDB.class);
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
new file mode 100644
index 0000000..d1c366f
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
@@ -0,0 +1,175 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.CloseableIterator;
+import
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Unit test for {@link KeyValueStorage}.
+ */
+@RunWith(Parameterized.class)
+public class KeyValueStorageTest {
+
+ private final KeyValueStorageFactory storageFactory;
+ private final ServerConfiguration configuration;
+
+ @Parameters
+ public static Collection<Object[]> configs() {
+ return Arrays.asList(new Object[][] { { KeyValueStorageRocksDB.factory
} });
+ }
+
+ public KeyValueStorageTest(KeyValueStorageFactory storageFactory) {
+ this.storageFactory = storageFactory;
+ this.configuration = new ServerConfiguration();
+ }
+
+ private static long fromArray(byte[] array) {
+ return ArrayUtil.getLong(array, 0);
+ }
+
+ private static byte[] toArray(long n) {
+ byte[] b = new byte[8];
+ ArrayUtil.setLong(b, 0, n);
+ return b;
+ }
+
+ @Test
+ public void simple() throws Exception {
+ File tmpDir = File.createTempFile("bookie", "test");
+ tmpDir.delete();
+
+ KeyValueStorage db =
storageFactory.newKeyValueStorage(tmpDir.getAbsolutePath(), DbConfigType.Small,
+ configuration);
+
+ assertEquals(null, db.getFloor(toArray(3)));
+ assertEquals(0, db.count());
+
+ db.put(toArray(5), toArray(5));
+
+ assertEquals(null, db.getFloor(toArray(3)));
+ assertEquals(1, db.count());
+
+ assertEquals(null, db.getFloor(toArray(5)));
+ assertEquals(5, fromArray(db.getFloor(toArray(6)).getKey()));
+
+ db.put(toArray(3), toArray(3));
+
+ assertEquals(null, db.getFloor(toArray(3)));
+ assertEquals(2, db.count());
+
+ // //
+
+ db.put(toArray(5), toArray(5));
+ // Count can be imprecise
+ assertTrue(db.count() > 0);
+
+ assertEquals(null, db.getFloor(toArray(1)));
+ assertEquals(null, db.getFloor(toArray(3)));
+ assertEquals(3, fromArray(db.getFloor(toArray(5)).getKey()));
+ assertEquals(5, fromArray(db.getFloor(toArray(6)).getKey()));
+ assertEquals(5, fromArray(db.getFloor(toArray(10)).getKey()));
+
+ // Iterate
+ List<Long> foundKeys = Lists.newArrayList();
+ CloseableIterator<Entry<byte[], byte[]>> iter = db.iterator();
+ try {
+ while (iter.hasNext()) {
+ foundKeys.add(fromArray(iter.next().getKey()));
+ }
+ } finally {
+ iter.close();
+ }
+
+ assertEquals(Lists.newArrayList(3L, 5L), foundKeys);
+
+ // Iterate over keys
+ foundKeys = Lists.newArrayList();
+ CloseableIterator<byte[]> iter2 = db.keys();
+ try {
+ while (iter2.hasNext()) {
+ foundKeys.add(fromArray(iter2.next()));
+ }
+ } finally {
+ iter2.close();
+ }
+
+ assertEquals(Lists.newArrayList(3L, 5L), foundKeys);
+
+ // Scan with limits
+ foundKeys = Lists.newArrayList();
+ iter2 = db.keys(toArray(1), toArray(4));
+ try {
+ while (iter2.hasNext()) {
+ foundKeys.add(fromArray(iter2.next()));
+ }
+ } finally {
+ iter2.close();
+ }
+
+ assertEquals(Lists.newArrayList(3L), foundKeys);
+
+ // Test deletion
+ db.put(toArray(10), toArray(10));
+ db.put(toArray(11), toArray(11));
+ db.put(toArray(12), toArray(12));
+ db.put(toArray(14), toArray(14));
+
+ // Count can be imprecise
+ assertTrue(db.count() > 0);
+
+ assertEquals(10L, fromArray(db.get(toArray(10))));
+ db.delete(toArray(10));
+ assertEquals(null, db.get(toArray(10)));
+ assertTrue(db.count() > 0);
+
+ Batch batch = db.newBatch();
+ batch.remove(toArray(11));
+ batch.remove(toArray(12));
+ batch.remove(toArray(13));
+ batch.flush();
+ assertEquals(null, db.get(toArray(11)));
+ assertEquals(null, db.get(toArray(12)));
+ assertEquals(null, db.get(toArray(13)));
+ assertEquals(14L, fromArray(db.get(toArray(14))));
+ batch.close();
+
+ db.close();
+ tmpDir.delete();
+ }
+}
diff --git a/pom.xml b/pom.xml
index ccb5a92..9c7a002 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@
<netty-boringssl.version>2.0.3.Final</netty-boringssl.version>
<slf4j.version>1.7.25</slf4j.version>
<zookeeper.version>3.5.3-beta</zookeeper.version>
+ <rocksdb.version>5.8.6</rocksdb.version>
<!-- plugin dependencies -->
<findbugs-maven-plugin.version>3.0.5</findbugs-maven-plugin.version>
<puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].