This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e6c1a2  DbLedgerStorage -- KeyValue storage interface with RocksDB 
implementation
8e6c1a2 is described below

commit 8e6c1a2e76a77b0dcf503e25a90a12d2d7b784ab
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Dec 13 16:56:34 2017 +0800

    DbLedgerStorage -- KeyValue storage interface with RocksDB implementation
    
    Add an interface for a generic key-value storage local library, with the 
default implementation based on RocksDB.
    
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie 
Guo <[email protected]>
    
    This closes #829 from merlimat/key-value-storage
---
 bookkeeper-server/pom.xml                          |   5 +
 .../bookkeeper/bookie/storage/ldb/ArrayUtil.java   |  72 ++++
 .../bookie/storage/ldb/KeyValueStorage.java        | 162 ++++++++
 .../bookie/storage/ldb/KeyValueStorageFactory.java |  42 ++
 .../bookie/storage/ldb/KeyValueStorageRocksDB.java | 447 +++++++++++++++++++++
 .../bookie/storage/ldb/KeyValueStorageTest.java    | 175 ++++++++
 pom.xml                                            |   1 +
 7 files changed, 904 insertions(+)

diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index b7b33eb..8c057e6 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -40,6 +40,11 @@
       <version>${project.parent.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.rocksdb</groupId>
+      <artifactId>rocksdbjni</artifactId>
+      <version>${rocksdb.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <version>${slf4j.version}</version>
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java
new file mode 100644
index 0000000..67e98f2
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+// CHECKSTYLE.OFF: IllegalImport
+import io.netty.util.internal.PlatformDependent;
+// CHECKSTYLE.ON: IllegalImport
+
+import java.nio.ByteOrder;
+
+/**
+ * Utility to serialize/deserialize longs into byte arrays.
+ */
+class ArrayUtil {
+
+    private static final boolean UNALIGNED = PlatformDependent.isUnaligned();
+    private static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
+    private static final boolean BIG_ENDIAN_NATIVE_ORDER = 
ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
+
+    public static long getLong(byte[] array, int index) {
+        if (HAS_UNSAFE && UNALIGNED) {
+            long v = PlatformDependent.getLong(array, index);
+            return BIG_ENDIAN_NATIVE_ORDER ? v : Long.reverseBytes(v);
+        }
+
+        return ((long) array[index] & 0xff) << 56 | //
+                ((long) array[index + 1] & 0xff) << 48 | //
+                ((long) array[index + 2] & 0xff) << 40 | //
+                ((long) array[index + 3] & 0xff) << 32 | //
+                ((long) array[index + 4] & 0xff) << 24 | //
+                ((long) array[index + 5] & 0xff) << 16 | //
+                ((long) array[index + 6] & 0xff) << 8 | //
+                (long) array[index + 7] & 0xff;
+    }
+
+    public static void setLong(byte[] array, int index, long value) {
+        if (HAS_UNSAFE && UNALIGNED) {
+            PlatformDependent.putLong(array, index, BIG_ENDIAN_NATIVE_ORDER ? 
value : Long.reverseBytes(value));
+        } else {
+            array[index] = (byte) (value >>> 56);
+            array[index + 1] = (byte) (value >>> 48);
+            array[index + 2] = (byte) (value >>> 40);
+            array[index + 3] = (byte) (value >>> 32);
+            array[index + 4] = (byte) (value >>> 24);
+            array[index + 5] = (byte) (value >>> 16);
+            array[index + 6] = (byte) (value >>> 8);
+            array[index + 7] = (byte) value;
+        }
+    }
+
+    public static final boolean isArrayAllZeros(final byte[] array) {
+        return PlatformDependent.isZero(array, 0, array.length);
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
new file mode 100644
index 0000000..b9bbb2a
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
@@ -0,0 +1,162 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+/**
+ * Abstraction of a generic key-value local database.
+ */
+public interface KeyValueStorage extends Closeable {
+
+    void put(byte[] key, byte[] value) throws IOException;
+
+    /**
+     * Get the value associated with the given key.
+     *
+     * @param key
+     *            the key to lookup
+     * @return the value or null if the key was not found
+     */
+    byte[] get(byte[] key) throws IOException;
+
+    /**
+     * Get the value associated with the given key.
+     *
+     * <p>This method will use the provided array store the value
+     *
+     * @param key
+     *            the key to lookup
+     * @param value
+     *            an array where to store the result
+     * @return -1 if the entry was not found or the length of the value
+     * @throws IOException
+     *             if the value array could not hold the result
+     */
+    int get(byte[] key, byte[] value) throws IOException;
+
+    /**
+     * Get the entry whose key is the biggest and it's lesser than the 
supplied key.
+     *
+     * <p>For example if the db contains :
+     *
+     * <pre>
+     * {
+     *      1 : 'a',
+     *      2 : 'b',
+     *      3 : 'c'
+     * }
+     * </pre>
+     *
+     * <p>Then:
+     *
+     * <pre>
+     * getFloor(3) --> (2, 'b')
+     * </pre>
+     *
+     * @param key
+     *            the non-inclusive upper limit key
+     * @return the entry before or null if there's no entry before key
+     */
+    Entry<byte[], byte[]> getFloor(byte[] key) throws IOException;
+
+    /**
+     * Get the entry whose key is bigger or equal the supplied key.
+     *
+     * @param key
+     * @return
+     * @throws IOException
+     */
+    Entry<byte[], byte[]> getCeil(byte[] key) throws IOException;
+
+    /**
+     *
+     * @param key
+     * @throws IOException
+     */
+    void delete(byte[] key) throws IOException;
+
+    /**
+     * Get an iterator over to scan sequentially through all the keys in the
+     * database.
+     *
+     * @return
+     */
+    CloseableIterator<byte[]> keys();
+
+    /**
+     * Get an iterator over to scan sequentially through all the keys within a
+     * specified range.
+     *
+     * @param firstKey
+     *            the first key in the range (included)
+     * @param lastKey
+     *            the lastKey in the range (not included)
+     *
+     */
+    CloseableIterator<byte[]> keys(byte[] firstKey, byte[] lastKey);
+
+    /**
+     * Return an iterator object that can be used to sequentially scan through 
all
+     * the entries in the database.
+     */
+    CloseableIterator<Entry<byte[], byte[]>> iterator();
+
+    /**
+     * Commit all pending write to durable storage.
+     */
+    void sync() throws IOException;
+
+    /**
+     * @return the number of keys.
+     */
+    long count() throws IOException;
+
+    /**
+     * Iterator interface.
+     *
+     * @param <T>
+     */
+    interface CloseableIterator<T> extends Closeable {
+        boolean hasNext() throws IOException;
+
+        T next() throws IOException;
+    }
+
+    Batch newBatch();
+
+    /**
+     * Interface for a batch to be written in the storage.
+     */
+    public interface Batch extends Closeable {
+        void put(byte[] key, byte[] value);
+
+        void remove(byte[] key);
+
+        void deleteRange(byte[] beginKey, byte[] endKey);
+
+        void clear();
+
+        void flush() throws IOException;
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageFactory.java
new file mode 100644
index 0000000..c35628d
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageFactory.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * Factory class to create instances of the key-value storage implementation.
+ */
+public interface KeyValueStorageFactory {
+
+    /**
+     * Enum used to specify different config profiles in the underlying 
storage.
+     */
+    enum DbConfigType {
+        Small, // Used for ledgers db, doesn't need particular configuration
+        Huge // Used for location index, lots of writes and much bigger dataset
+    }
+
+    KeyValueStorage newKeyValueStorage(String path, DbConfigType dbConfigType, 
ServerConfiguration conf)
+            throws IOException;
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
new file mode 100644
index 0000000..0978fc0
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
@@ -0,0 +1,447 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.primitives.UnsignedBytes;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import 
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.ChecksumType;
+import org.rocksdb.CompressionType;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RocksDB based implementation of the KeyValueStorage.
+ */
+public class KeyValueStorageRocksDB implements KeyValueStorage {
+
+    static KeyValueStorageFactory factory = (path, dbConfigType, conf) -> new 
KeyValueStorageRocksDB(path, dbConfigType,
+            conf);
+
+    private final RocksDB db;
+
+    private final WriteOptions optionSync = new WriteOptions();
+    private final WriteOptions optionDontSync = new WriteOptions();
+
+    private final ReadOptions optionCache = new ReadOptions();
+    private final ReadOptions optionDontCache = new ReadOptions();
+
+    private final WriteBatch emptyBatch = new WriteBatch();
+
+    private static final String ROCKSDB_LOG_LEVEL = 
"dbStorage_rocksDB_logLevel";
+    private static final String ROCKSDB_WRITE_BUFFER_SIZE_MB = 
"dbStorage_rocksDB_writeBufferSizeMB";
+    private static final String ROCKSDB_SST_SIZE_MB = 
"dbStorage_rocksDB_sstSizeInMB";
+    private static final String ROCKSDB_BLOCK_SIZE = 
"dbStorage_rocksDB_blockSize";
+    private static final String ROCKSDB_BLOOM_FILTERS_BITS_PER_KEY = 
"dbStorage_rocksDB_bloomFilterBitsPerKey";
+    private static final String ROCKSDB_BLOCK_CACHE_SIZE = 
"dbStorage_rocksDB_blockCacheSize";
+    private static final String ROCKSDB_NUM_LEVELS = 
"dbStorage_rocksDB_numLevels";
+    private static final String ROCKSDB_NUM_FILES_IN_LEVEL0 = 
"dbStorage_rocksDB_numFilesInLevel0";
+    private static final String ROCKSDB_MAX_SIZE_IN_LEVEL1_MB = 
"dbStorage_rocksDB_maxSizeInLevel1MB";
+
+    public KeyValueStorageRocksDB(String path, DbConfigType dbConfigType, 
ServerConfiguration conf) throws IOException {
+        this(path, dbConfigType, conf, false);
+    }
+
+    public KeyValueStorageRocksDB(String path, DbConfigType dbConfigType, 
ServerConfiguration conf, boolean readOnly)
+            throws IOException {
+        try {
+            RocksDB.loadLibrary();
+        } catch (Throwable t) {
+            throw new IOException("Failed to load RocksDB JNI library", t);
+        }
+
+        try (Options options = new Options()) {
+            options.setCreateIfMissing(true);
+
+            if (dbConfigType == DbConfigType.Huge) {
+                long writeBufferSizeMB = 
conf.getInt(ROCKSDB_WRITE_BUFFER_SIZE_MB, 64);
+                long sstSizeMB = conf.getInt(ROCKSDB_SST_SIZE_MB, 64);
+                int numLevels = conf.getInt(ROCKSDB_NUM_LEVELS, -1);
+                int numFilesInLevel0 = 
conf.getInt(ROCKSDB_NUM_FILES_IN_LEVEL0, 4);
+                long maxSizeInLevel1MB = 
conf.getLong(ROCKSDB_MAX_SIZE_IN_LEVEL1_MB, 256);
+                int blockSize = conf.getInt(ROCKSDB_BLOCK_SIZE, 64 * 1024);
+                long blockCacheSize = conf.getLong(ROCKSDB_BLOCK_CACHE_SIZE, 
256 * 1024 * 1024);
+                int bloomFilterBitsPerKey = 
conf.getInt(ROCKSDB_BLOOM_FILTERS_BITS_PER_KEY, 10);
+
+                options.setCompressionType(CompressionType.LZ4_COMPRESSION);
+                options.setWriteBufferSize(writeBufferSizeMB * 1024 * 1024);
+                options.setMaxWriteBufferNumber(4);
+                if (numLevels > 0) {
+                    options.setNumLevels(numLevels);
+                }
+                options.setLevelZeroFileNumCompactionTrigger(numFilesInLevel0);
+                options.setMaxBytesForLevelBase(maxSizeInLevel1MB * 1024 * 
1024);
+                options.setMaxBackgroundCompactions(16);
+                options.setMaxBackgroundFlushes(16);
+                options.setIncreaseParallelism(32);
+                options.setMaxTotalWalSize(512 * 1024 * 1024);
+                options.setMaxOpenFiles(-1);
+                options.setTargetFileSizeBase(sstSizeMB * 1024 * 1024);
+                
options.setDeleteObsoleteFilesPeriodMicros(TimeUnit.HOURS.toMicros(1));
+
+                BlockBasedTableConfig tableOptions = new 
BlockBasedTableConfig();
+                tableOptions.setBlockSize(blockSize);
+                tableOptions.setBlockCacheSize(blockCacheSize);
+                tableOptions.setFormatVersion(2);
+                tableOptions.setChecksumType(ChecksumType.kxxHash);
+                if (bloomFilterBitsPerKey > 0) {
+                    tableOptions.setFilter(new 
BloomFilter(bloomFilterBitsPerKey, false));
+                }
+
+                // Options best suited for HDDs
+                tableOptions.setCacheIndexAndFilterBlocks(true);
+                options.setLevelCompactionDynamicLevelBytes(true);
+
+                options.setTableFormatConfig(tableOptions);
+            }
+
+            // Configure log level
+            String logLevel = conf.getString(ROCKSDB_LOG_LEVEL, "info");
+            switch (logLevel) {
+            case "debug":
+                options.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL);
+                break;
+            case "info":
+                options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
+                break;
+            case "warn":
+                options.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
+                break;
+            case "error":
+                options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
+                break;
+            default:
+                log.warn("Unrecognized RockDB log level: {}", logLevel);
+            }
+
+            // Keep log files for 1month
+            options.setKeepLogFileNum(30);
+            options.setLogFileTimeToRoll(TimeUnit.DAYS.toSeconds(1));
+
+            try {
+                if (readOnly) {
+                    db = RocksDB.openReadOnly(options, path);
+                } else {
+                    db = RocksDB.open(options, path);
+                }
+            } catch (RocksDBException e) {
+                throw new IOException("Error open RocksDB database", e);
+            }
+        }
+
+        optionSync.setSync(true);
+        optionDontSync.setSync(false);
+
+        optionCache.setFillCache(true);
+        optionDontCache.setFillCache(false);
+    }
+
+    @Override
+    public void close() throws IOException {
+        db.close();
+        optionSync.close();
+        optionDontSync.close();
+        optionCache.close();
+        optionDontCache.close();
+        emptyBatch.close();
+    }
+
+    @Override
+    public void put(byte[] key, byte[] value) throws IOException {
+        try {
+            db.put(optionDontSync, key, value);
+        } catch (RocksDBException e) {
+            throw new IOException("Error in RocksDB put", e);
+        }
+    }
+
+    @Override
+    public byte[] get(byte[] key) throws IOException {
+        try {
+            return db.get(key);
+        } catch (RocksDBException e) {
+            throw new IOException("Error in RocksDB get", e);
+        }
+    }
+
+    @Override
+    public int get(byte[] key, byte[] value) throws IOException {
+        try {
+            int res = db.get(key, value);
+            if (res == RocksDB.NOT_FOUND) {
+                return -1;
+            } else if (res > value.length) {
+                throw new IOException("Value array is too small to fit the 
result");
+            } else {
+                return res;
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Error in RocksDB get", e);
+        }
+    }
+
+    @Override
+    public Entry<byte[], byte[]> getFloor(byte[] key) throws IOException {
+        try (RocksIterator iterator = db.newIterator(optionCache)) {
+            // Position the iterator on the record whose key is >= to the 
supplied key
+            iterator.seek(key);
+
+            if (!iterator.isValid()) {
+                // There are no entries >= key
+                iterator.seekToLast();
+                if (iterator.isValid()) {
+                    return new EntryWrapper(iterator.key(), iterator.value());
+                } else {
+                    // Db is empty
+                    return null;
+                }
+            }
+
+            iterator.prev();
+
+            if (!iterator.isValid()) {
+                // Iterator is on the 1st entry of the db and this entry key 
is >= to the target
+                // key
+                return null;
+            } else {
+                return new EntryWrapper(iterator.key(), iterator.value());
+            }
+        }
+    }
+
+    @Override
+    public Entry<byte[], byte[]> getCeil(byte[] key) throws IOException {
+        try (RocksIterator iterator = db.newIterator(optionCache)) {
+            // Position the iterator on the record whose key is >= to the 
supplied key
+            iterator.seek(key);
+
+            if (iterator.isValid()) {
+                return new EntryWrapper(iterator.key(), iterator.value());
+            } else {
+                return null;
+            }
+        }
+    }
+
+    @Override
+    public void delete(byte[] key) throws IOException {
+        try {
+            db.delete(optionDontSync, key);
+        } catch (RocksDBException e) {
+            throw new IOException("Error in RocksDB delete", e);
+        }
+    }
+
+    @Override
+    public void sync() throws IOException {
+        try {
+            db.write(optionSync, emptyBatch);
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public CloseableIterator<byte[]> keys() {
+        final RocksIterator iterator = db.newIterator(optionCache);
+        iterator.seekToFirst();
+
+        return new CloseableIterator<byte[]>() {
+            @Override
+            public boolean hasNext() {
+                return iterator.isValid();
+            }
+
+            @Override
+            public byte[] next() {
+                checkState(iterator.isValid());
+                byte[] key = iterator.key();
+                iterator.next();
+                return key;
+            }
+
+            @Override
+            public void close() {
+                iterator.close();
+            }
+        };
+    }
+
+    @Override
+    public CloseableIterator<byte[]> keys(byte[] firstKey, byte[] lastKey) {
+        final RocksIterator iterator = db.newIterator(optionCache);
+        iterator.seek(firstKey);
+
+        return new CloseableIterator<byte[]>() {
+            @Override
+            public boolean hasNext() {
+                return iterator.isValid() && 
ByteComparator.compare(iterator.key(), lastKey) < 0;
+            }
+
+            @Override
+            public byte[] next() {
+                checkState(iterator.isValid());
+                byte[] key = iterator.key();
+                iterator.next();
+                return key;
+            }
+
+            @Override
+            public void close() {
+                iterator.close();
+            }
+        };
+    }
+
+    @Override
+    public CloseableIterator<Entry<byte[], byte[]>> iterator() {
+        final RocksIterator iterator = db.newIterator(optionDontCache);
+        iterator.seekToFirst();
+        final EntryWrapper entryWrapper = new EntryWrapper();
+
+        return new CloseableIterator<Entry<byte[], byte[]>>() {
+            @Override
+            public boolean hasNext() {
+                return iterator.isValid();
+            }
+
+            @Override
+            public Entry<byte[], byte[]> next() {
+                checkState(iterator.isValid());
+                entryWrapper.key = iterator.key();
+                entryWrapper.value = iterator.value();
+                iterator.next();
+                return entryWrapper;
+            }
+
+            @Override
+            public void close() {
+                iterator.close();
+            }
+        };
+    }
+
+    @Override
+    public long count() throws IOException {
+        try {
+            return db.getLongProperty("rocksdb.estimate-num-keys");
+        } catch (RocksDBException e) {
+            throw new IOException("Error in getting records count", e);
+        }
+    }
+
+    @Override
+    public Batch newBatch() {
+        return new RocksDBBatch();
+    }
+
+    private class RocksDBBatch implements Batch {
+        private final WriteBatch writeBatch = new WriteBatch();
+
+        @Override
+        public void close() {
+            writeBatch.close();
+        }
+
+        @Override
+        public void put(byte[] key, byte[] value) {
+            writeBatch.put(key, value);
+        }
+
+        @Override
+        public void remove(byte[] key) {
+            writeBatch.remove(key);
+        }
+
+        @Override
+        public void clear() {
+            writeBatch.clear();
+        }
+
+        @Override
+        public void deleteRange(byte[] beginKey, byte[] endKey) {
+            writeBatch.deleteRange(beginKey, endKey);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            try {
+                db.write(optionSync, writeBatch);
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to flush RocksDB batch", e);
+            }
+        }
+    }
+
+    private static final class EntryWrapper implements Entry<byte[], byte[]> {
+        // This is not final since the iterator will reuse the same 
EntryWrapper
+        // instance at each step
+        private byte[] key;
+        private byte[] value;
+
+        public EntryWrapper() {
+            this.key = null;
+            this.value = null;
+        }
+
+        public EntryWrapper(byte[] key, byte[] value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public byte[] setValue(byte[] value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public byte[] getValue() {
+            return value;
+        }
+
+        @Override
+        public byte[] getKey() {
+            return key;
+        }
+    }
+
+    private static final Comparator<byte[]> ByteComparator = 
UnsignedBytes.lexicographicalComparator();
+
+    private static final Logger log = 
LoggerFactory.getLogger(KeyValueStorageRocksDB.class);
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
new file mode 100644
index 0000000..d1c366f
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
@@ -0,0 +1,175 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import 
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.CloseableIterator;
+import 
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Unit test for {@link KeyValueStorage}.
+ */
+@RunWith(Parameterized.class)
+public class KeyValueStorageTest {
+
+    private final KeyValueStorageFactory storageFactory;
+    private final ServerConfiguration configuration;
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] { { KeyValueStorageRocksDB.factory 
} });
+    }
+
+    public KeyValueStorageTest(KeyValueStorageFactory storageFactory) {
+        this.storageFactory = storageFactory;
+        this.configuration = new ServerConfiguration();
+    }
+
+    private static long fromArray(byte[] array) {
+        return ArrayUtil.getLong(array, 0);
+    }
+
+    private static byte[] toArray(long n) {
+        byte[] b = new byte[8];
+        ArrayUtil.setLong(b, 0, n);
+        return b;
+    }
+
+    @Test
+    public void simple() throws Exception {
+        File tmpDir = File.createTempFile("bookie", "test");
+        tmpDir.delete();
+
+        KeyValueStorage db = 
storageFactory.newKeyValueStorage(tmpDir.getAbsolutePath(), DbConfigType.Small,
+                configuration);
+
+        assertEquals(null, db.getFloor(toArray(3)));
+        assertEquals(0, db.count());
+
+        db.put(toArray(5), toArray(5));
+
+        assertEquals(null, db.getFloor(toArray(3)));
+        assertEquals(1, db.count());
+
+        assertEquals(null, db.getFloor(toArray(5)));
+        assertEquals(5, fromArray(db.getFloor(toArray(6)).getKey()));
+
+        db.put(toArray(3), toArray(3));
+
+        assertEquals(null, db.getFloor(toArray(3)));
+        assertEquals(2, db.count());
+
+        // //
+
+        db.put(toArray(5), toArray(5));
+        // Count can be imprecise
+        assertTrue(db.count() > 0);
+
+        assertEquals(null, db.getFloor(toArray(1)));
+        assertEquals(null, db.getFloor(toArray(3)));
+        assertEquals(3, fromArray(db.getFloor(toArray(5)).getKey()));
+        assertEquals(5, fromArray(db.getFloor(toArray(6)).getKey()));
+        assertEquals(5, fromArray(db.getFloor(toArray(10)).getKey()));
+
+        // Iterate
+        List<Long> foundKeys = Lists.newArrayList();
+        CloseableIterator<Entry<byte[], byte[]>> iter = db.iterator();
+        try {
+            while (iter.hasNext()) {
+                foundKeys.add(fromArray(iter.next().getKey()));
+            }
+        } finally {
+            iter.close();
+        }
+
+        assertEquals(Lists.newArrayList(3L, 5L), foundKeys);
+
+        // Iterate over keys
+        foundKeys = Lists.newArrayList();
+        CloseableIterator<byte[]> iter2 = db.keys();
+        try {
+            while (iter2.hasNext()) {
+                foundKeys.add(fromArray(iter2.next()));
+            }
+        } finally {
+            iter2.close();
+        }
+
+        assertEquals(Lists.newArrayList(3L, 5L), foundKeys);
+
+        // Scan with limits
+        foundKeys = Lists.newArrayList();
+        iter2 = db.keys(toArray(1), toArray(4));
+        try {
+            while (iter2.hasNext()) {
+                foundKeys.add(fromArray(iter2.next()));
+            }
+        } finally {
+            iter2.close();
+        }
+
+        assertEquals(Lists.newArrayList(3L), foundKeys);
+
+        // Test deletion
+        db.put(toArray(10), toArray(10));
+        db.put(toArray(11), toArray(11));
+        db.put(toArray(12), toArray(12));
+        db.put(toArray(14), toArray(14));
+
+        // Count can be imprecise
+        assertTrue(db.count() > 0);
+
+        assertEquals(10L, fromArray(db.get(toArray(10))));
+        db.delete(toArray(10));
+        assertEquals(null, db.get(toArray(10)));
+        assertTrue(db.count() > 0);
+
+        Batch batch = db.newBatch();
+        batch.remove(toArray(11));
+        batch.remove(toArray(12));
+        batch.remove(toArray(13));
+        batch.flush();
+        assertEquals(null, db.get(toArray(11)));
+        assertEquals(null, db.get(toArray(12)));
+        assertEquals(null, db.get(toArray(13)));
+        assertEquals(14L, fromArray(db.get(toArray(14))));
+        batch.close();
+
+        db.close();
+        tmpDir.delete();
+    }
+}
diff --git a/pom.xml b/pom.xml
index ccb5a92..9c7a002 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@
     <netty-boringssl.version>2.0.3.Final</netty-boringssl.version>
     <slf4j.version>1.7.25</slf4j.version>
     <zookeeper.version>3.5.3-beta</zookeeper.version>
+    <rocksdb.version>5.8.6</rocksdb.version>
     <!-- plugin dependencies -->
     <findbugs-maven-plugin.version>3.0.5</findbugs-maven-plugin.version>
     <puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to