This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new bba1c6f  DbLedgerStorage -- Main implementation
bba1c6f is described below

commit bba1c6f5d0d92d647467438876cfa32b699780f6
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Dec 15 09:29:10 2017 -0800

    DbLedgerStorage -- Main implementation
    
    `LedgerStorage` implementation that uses the EntryLogger mechanism but 
stores the indexes in a RocksDB database.
    
    In addition, there are also a WriteCache and a ReadCache that are used to 
completely decouple the read & write paths.
    
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #855 from merlimat/db-ledger-storage-impl
---
 bookkeeper-proto/pom.xml                           |   1 +
 .../main/proto/DbLedgerStorageDataFormats.proto    |  30 +
 .../bookie/storage/ldb/DbLedgerStorage.java        | 794 +++++++++++++++++++++
 .../bookie/storage/ldb/EntryLocationIndex.java     | 232 ++++++
 .../bookie/storage/ldb/LedgerMetadataIndex.java    | 262 +++++++
 .../bookie/storage/ldb/LongPairWrapper.java        |  69 ++
 .../bookkeeper/bookie/storage/ldb/LongWrapper.java |  67 ++
 .../storage/ldb/DbLedgerStorageBookieTest.java     |  52 ++
 .../bookie/storage/ldb/DbLedgerStorageTest.java    | 423 +++++++++++
 .../storage/ldb/DbLedgerStorageWriteCacheTest.java | 137 ++++
 .../bookie/storage/ldb/EntryLocationIndexTest.java | 111 +++
 .../main/resources/bookkeeper/findbugsExclude.xml  |   4 +
 12 files changed, 2182 insertions(+)

diff --git a/bookkeeper-proto/pom.xml b/bookkeeper-proto/pom.xml
index cc9169d..388bdcb 100644
--- a/bookkeeper-proto/pom.xml
+++ b/bookkeeper-proto/pom.xml
@@ -45,6 +45,7 @@
             <!-- exclude generated file //-->
             <exclude>**/DataFormats.java</exclude>
             <exclude>**/BookkeeperProtocol.java</exclude>
+            <exclude>**/DbLedgerStorageDataFormats.java</exclude>
           </excludes>
         </configuration>
       </plugin>
diff --git a/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto 
b/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto
new file mode 100644
index 0000000..e68b2d0
--- /dev/null
+++ b/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+syntax = "proto2";
+
+option java_package = "org.apache.bookkeeper.bookie.storage.ldb";
+option optimize_for = SPEED;
+
+/**
+ * Ledger metadata stored in the bookie
+ */
+message LedgerData {
+    required bool exists = 1;
+    required bool fenced = 2;
+    required bytes masterKey = 3;
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
new file mode 100644
index 0000000..fa80016
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -0,0 +1,794 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.io.IOException;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.SortedMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.GarbageCollectorThread;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import 
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of LedgerStorage that uses RocksDB to keep the indexes for
+ * entries stored in EntryLogs.
+ */
+public class DbLedgerStorage implements CompactableLedgerStorage {
+
+    private EntryLogger entryLogger;
+
+    private LedgerMetadataIndex ledgerIndex;
+    private EntryLocationIndex entryLocationIndex;
+
+    private GarbageCollectorThread gcThread;
+
+    // Write cache where all new entries are inserted into
+    protected WriteCache writeCache;
+
+    // Write cache that is used to swap with writeCache during flushes
+    protected WriteCache writeCacheBeingFlushed;
+
+    // Cache where we insert entries for speculative reading
+    private ReadCache readCache;
+
+    private final ReentrantReadWriteLock writeCacheMutex = new 
ReentrantReadWriteLock();
+    private final Condition flushWriteCacheCondition = 
writeCacheMutex.writeLock().newCondition();
+
+    protected final ReentrantLock flushMutex = new ReentrantLock();
+
+    protected final AtomicBoolean hasFlushBeenTriggered = new 
AtomicBoolean(false);
+    private final AtomicBoolean isFlushOngoing = new AtomicBoolean(false);
+
+    private final ExecutorService executor = 
Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage"));
+
+    // Executor used to for db index cleanup
+    private final ExecutorService cleanupExecutor = Executors
+            .newSingleThreadExecutor(new 
DefaultThreadFactory("db-storage-cleanup"));
+
+    static final String WRITE_CACHE_MAX_SIZE_MB = 
"dbStorage_writeCacheMaxSizeMb";
+    static final String READ_AHEAD_CACHE_BATCH_SIZE = 
"dbStorage_readAheadCacheBatchSize";
+    static final String READ_AHEAD_CACHE_MAX_SIZE_MB = 
"dbStorage_readAheadCacheMaxSizeMb";
+
+    private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16;
+    private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16;
+    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
+
+    private static final int MB = 1024 * 1024;
+
+    private final CopyOnWriteArrayList<LedgerDeletionListener> 
ledgerDeletionListeners = Lists
+            .newCopyOnWriteArrayList();
+
+    private long writeCacheMaxSize;
+
+    private CheckpointSource checkpointSource = null;
+    private Checkpoint lastCheckpoint = Checkpoint.MIN;
+
+    private long readCacheMaxSize;
+    private int readAheadCacheBatchSize;
+
+    private StatsLogger stats;
+
+    private OpStatsLogger addEntryStats;
+    private OpStatsLogger readEntryStats;
+    private OpStatsLogger readCacheHitStats;
+    private OpStatsLogger readCacheMissStats;
+    private OpStatsLogger readAheadBatchCountStats;
+    private OpStatsLogger readAheadBatchSizeStats;
+    private OpStatsLogger flushStats;
+    private OpStatsLogger flushSizeStats;
+
+    @Override
+    public void initialize(ServerConfiguration conf, LedgerManager 
ledgerManager, LedgerDirsManager ledgerDirsManager,
+            LedgerDirsManager indexDirsManager, CheckpointSource 
checkpointSource, Checkpointer checkpointer,
+            StatsLogger statsLogger) throws IOException {
+        checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
+                "Db implementation only allows for one storage dir");
+
+        String baseDir = 
ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+
+        writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, 
DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
+
+        writeCache = new WriteCache(writeCacheMaxSize / 2);
+        writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
+
+        this.checkpointSource = checkpointSource;
+
+        readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, 
DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
+        readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, 
DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
+
+        readCache = new ReadCache(readCacheMaxSize);
+
+        this.stats = statsLogger;
+
+        log.info("Started Db Ledger Storage");
+        log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB);
+        log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);
+        log.info(" - Read Ahead Batch size: : {}", readAheadCacheBatchSize);
+
+        ledgerIndex = new LedgerMetadataIndex(conf, 
KeyValueStorageRocksDB.factory, baseDir, stats);
+        entryLocationIndex = new EntryLocationIndex(conf, 
KeyValueStorageRocksDB.factory, baseDir, stats);
+
+        entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        gcThread = new GarbageCollectorThread(conf, ledgerManager, this);
+
+        registerStats();
+    }
+
+    public void registerStats() {
+        stats.registerGauge("write-cache-size", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return writeCache.size() + writeCacheBeingFlushed.size();
+            }
+        });
+        stats.registerGauge("write-cache-count", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return writeCache.count() + writeCacheBeingFlushed.count();
+            }
+        });
+        stats.registerGauge("read-cache-size", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return readCache.size();
+            }
+        });
+        stats.registerGauge("read-cache-count", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return readCache.count();
+            }
+        });
+
+        addEntryStats = stats.getOpStatsLogger("add-entry");
+        readEntryStats = stats.getOpStatsLogger("read-entry");
+        readCacheHitStats = stats.getOpStatsLogger("read-cache-hits");
+        readCacheMissStats = stats.getOpStatsLogger("read-cache-misses");
+        readAheadBatchCountStats = 
stats.getOpStatsLogger("readahead-batch-count");
+        readAheadBatchSizeStats = 
stats.getOpStatsLogger("readahead-batch-size");
+        flushStats = stats.getOpStatsLogger("flush");
+        flushSizeStats = stats.getOpStatsLogger("flush-size");
+    }
+
+    @Override
+    public void start() {
+        gcThread.start();
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        try {
+            flush();
+
+            gcThread.shutdown();
+            entryLogger.shutdown();
+
+            cleanupExecutor.shutdown();
+            cleanupExecutor.awaitTermination(1, TimeUnit.SECONDS);
+
+            ledgerIndex.close();
+            entryLocationIndex.close();
+
+            writeCache.close();
+            writeCacheBeingFlushed.close();
+            readCache.close();
+            executor.shutdown();
+
+        } catch (IOException e) {
+            log.error("Error closing db storage", e);
+        }
+    }
+
+    @Override
+    public boolean ledgerExists(long ledgerId) throws IOException {
+        try {
+            LedgerData ledgerData = ledgerIndex.get(ledgerId);
+            if (log.isDebugEnabled()) {
+                log.debug("Ledger exists. ledger: {} : {}", ledgerId, 
ledgerData.getExists());
+            }
+            return ledgerData.getExists();
+        } catch (Bookie.NoLedgerException nle) {
+            // ledger does not exist
+            return false;
+        }
+    }
+
+    @Override
+    public boolean isFenced(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("isFenced. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.get(ledgerId).getFenced();
+    }
+
+    @Override
+    public boolean setFenced(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Set fenced. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.setFenced(ledgerId);
+    }
+
+    @Override
+    public void setMasterKey(long ledgerId, byte[] masterKey) throws 
IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Set master key. ledger: {}", ledgerId);
+        }
+        ledgerIndex.setMasterKey(ledgerId, masterKey);
+    }
+
+    @Override
+    public byte[] readMasterKey(long ledgerId) throws IOException, 
BookieException {
+        if (log.isDebugEnabled()) {
+            log.debug("Read master key. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.get(ledgerId).getMasterKey().toByteArray();
+    }
+
+    @Override
+    public long addEntry(ByteBuf entry) throws IOException {
+        long startTime = MathUtils.nowInNano();
+
+        long ledgerId = entry.readLong();
+        long entryId = entry.readLong();
+        entry.resetReaderIndex();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Add entry. {}@{}", ledgerId, entryId);
+        }
+
+        // Waits if the write cache is being switched for a flush
+        writeCacheMutex.readLock().lock();
+        boolean inserted;
+        try {
+            inserted = writeCache.put(ledgerId, entryId, entry);
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+
+        if (!inserted) {
+            triggerFlushAndAddEntry(ledgerId, entryId, entry);
+        }
+
+        recordSuccessfulEvent(addEntryStats, startTime);
+        return entryId;
+    }
+
+    private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf 
entry) throws IOException {
+        // Write cache is full, we need to trigger a flush so that it gets 
rotated
+        writeCacheMutex.writeLock().lock();
+
+        try {
+            // If the flush has already been triggered or flush has already 
switched the
+            // cache, we don't need to
+            // trigger another flush
+            if (!isFlushOngoing.get() && 
hasFlushBeenTriggered.compareAndSet(false, true)) {
+                // Trigger an early flush in background
+                log.info("Write cache is full, triggering flush");
+                executor.execute(() -> {
+                    try {
+                        flush();
+                    } catch (IOException e) {
+                        log.error("Error during flush", e);
+                    }
+                });
+            }
+
+            long timeoutNs = TimeUnit.MILLISECONDS.toNanos(100);
+            while (hasFlushBeenTriggered.get()) {
+                if (timeoutNs <= 0L) {
+                    throw new IOException("Write cache was not trigger within 
the timeout, cannot add entry " + ledgerId
+                            + "@" + entryId);
+                }
+                timeoutNs = flushWriteCacheCondition.awaitNanos(timeoutNs);
+            }
+
+            if (!writeCache.put(ledgerId, entryId, entry)) {
+                // Still wasn't able to cache entry
+                throw new IOException("Error while inserting entry in write 
cache" + ledgerId + "@" + entryId);
+            }
+
+        } catch (InterruptedException e) {
+            throw new IOException("Interrupted when adding entry " + ledgerId 
+ "@" + entryId);
+        } finally {
+            writeCacheMutex.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
+        long startTime = MathUtils.nowInNano();
+        if (log.isDebugEnabled()) {
+            log.debug("Get Entry: {}@{}", ledgerId, entryId);
+        }
+
+        if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+            return getLastEntry(ledgerId);
+        }
+
+        writeCacheMutex.readLock().lock();
+        try {
+            // First try to read from the write cache of recent entries
+            ByteBuf entry = writeCache.get(ledgerId, entryId);
+            if (entry != null) {
+                recordSuccessfulEvent(readCacheHitStats, startTime);
+                recordSuccessfulEvent(readEntryStats, startTime);
+                return entry;
+            }
+
+            // If there's a flush going on, the entry might be in the flush 
buffer
+            entry = writeCacheBeingFlushed.get(ledgerId, entryId);
+            if (entry != null) {
+                recordSuccessfulEvent(readCacheHitStats, startTime);
+                recordSuccessfulEvent(readEntryStats, startTime);
+                return entry;
+            }
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+
+        // Try reading from read-ahead cache
+        ByteBuf entry = readCache.get(ledgerId, entryId);
+        if (entry != null) {
+            recordSuccessfulEvent(readCacheHitStats, startTime);
+            recordSuccessfulEvent(readEntryStats, startTime);
+            return entry;
+        }
+
+        // Read from main storage
+        long entryLocation;
+        try {
+            entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
+            if (entryLocation == 0) {
+                throw new NoEntryException(ledgerId, entryId);
+            }
+            entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
+        } catch (NoEntryException e) {
+            recordFailedEvent(readEntryStats, startTime);
+            throw e;
+        }
+
+        readCache.put(ledgerId, entryId, entry);
+
+        // Try to read more entries
+        long nextEntryLocation = entryLocation + 4 /* size header */ + 
entry.readableBytes();
+        fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);
+
+        recordSuccessfulEvent(readCacheMissStats, startTime);
+        recordSuccessfulEvent(readEntryStats, startTime);
+        return entry;
+    }
+
+    private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, 
long firstEntryLocation) {
+        try {
+            long firstEntryLogId = (firstEntryLocation >> 32);
+            long currentEntryLogId = firstEntryLogId;
+            long currentEntryLocation = firstEntryLocation;
+            int count = 0;
+            long size = 0;
+
+            while (count < readAheadCacheBatchSize && currentEntryLogId == 
firstEntryLogId) {
+                ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, 
-1, currentEntryLocation);
+
+                try {
+                    long currentEntryLedgerId = entry.getLong(0);
+                    long currentEntryId = entry.getLong(8);
+
+                    if (currentEntryLedgerId != orginalLedgerId) {
+                        // Found an entry belonging to a different ledger, 
stopping read-ahead
+                        entry.release();
+                        return;
+                    }
+
+                    // Insert entry in read cache
+                    readCache.put(orginalLedgerId, currentEntryId, entry);
+
+                    count++;
+                    size += entry.readableBytes();
+
+                    currentEntryLocation += 4 + entry.readableBytes();
+                    currentEntryLogId = currentEntryLocation >> 32;
+                } finally {
+                    entry.release();
+                }
+            }
+
+            readAheadBatchCountStats.registerSuccessfulValue(count);
+            readAheadBatchSizeStats.registerSuccessfulValue(size);
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Exception during read ahead for ledger: {}: e", 
orginalLedgerId, e);
+            }
+        }
+    }
+
+    public ByteBuf getLastEntry(long ledgerId) throws IOException {
+        long startTime = MathUtils.nowInNano();
+
+        writeCacheMutex.readLock().lock();
+        try {
+            // First try to read from the write cache of recent entries
+            ByteBuf entry = writeCache.getLastEntry(ledgerId);
+            if (entry != null) {
+                if (log.isDebugEnabled()) {
+                    long foundLedgerId = entry.readLong(); // ledgedId
+                    long entryId = entry.readLong();
+                    entry.resetReaderIndex();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Found last entry for ledger {} in write 
cache: {}@{}", ledgerId, foundLedgerId,
+                                entryId);
+                    }
+                }
+
+                recordSuccessfulEvent(readCacheHitStats, startTime);
+                recordSuccessfulEvent(readEntryStats, startTime);
+                return entry;
+            }
+
+            // If there's a flush going on, the entry might be in the flush 
buffer
+            entry = writeCacheBeingFlushed.getLastEntry(ledgerId);
+            if (entry != null) {
+                if (log.isDebugEnabled()) {
+                    entry.readLong(); // ledgedId
+                    long entryId = entry.readLong();
+                    entry.resetReaderIndex();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Found last entry for ledger {} in write 
cache being flushed: {}", ledgerId, entryId);
+                    }
+                }
+
+                recordSuccessfulEvent(readCacheHitStats, startTime);
+                recordSuccessfulEvent(readEntryStats, startTime);
+                return entry;
+            }
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+
+        // Search the last entry in storage
+        long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId);
+        if (log.isDebugEnabled()) {
+            log.debug("Found last entry for ledger {} in db: {}", ledgerId, 
lastEntryId);
+        }
+
+        long entryLocation = entryLocationIndex.getLocation(ledgerId, 
lastEntryId);
+        ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId, 
entryLocation);
+
+        recordSuccessfulEvent(readCacheMissStats, startTime);
+        recordSuccessfulEvent(readEntryStats, startTime);
+        return content;
+    }
+
+    @VisibleForTesting
+    boolean isFlushRequired() {
+        writeCacheMutex.readLock().lock();
+        try {
+            return !writeCache.isEmpty();
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void checkpoint(Checkpoint checkpoint) throws IOException {
+        Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();
+        if (lastCheckpoint.compareTo(checkpoint) > 0) {
+            return;
+        }
+
+        long startTime = MathUtils.nowInNano();
+
+        // Only a single flush operation can happen at a time
+        flushMutex.lock();
+
+        try {
+            // Swap the write cache so that writes can continue to happen 
while the flush is
+            // ongoing
+            swapWriteCache();
+
+            long sizeToFlush = writeCacheBeingFlushed.size();
+            if (log.isDebugEnabled()) {
+                log.debug("Flushing entries. count: {} -- size {} Mb", 
writeCacheBeingFlushed.count(),
+                        sizeToFlush / 1024.0 / 1024);
+            }
+
+            // Write all the pending entries into the entry logger and collect 
the offset
+            // position for each entry
+
+            Batch batch = entryLocationIndex.newBatch();
+            writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
+                try {
+                    long location = entryLogger.addEntry(ledgerId, entry, 
true);
+                    entryLocationIndex.addLocation(batch, ledgerId, entryId, 
location);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            entryLogger.flush();
+
+            long batchFlushStarTime = System.nanoTime();
+            batch.flush();
+            batch.close();
+            if (log.isDebugEnabled()) {
+                log.debug("DB batch flushed time : {} s",
+                        MathUtils.elapsedNanos(batchFlushStarTime) / (double) 
TimeUnit.SECONDS.toNanos(1));
+            }
+
+            ledgerIndex.flush();
+
+            cleanupExecutor.execute(() -> {
+                // There can only be one single cleanup task running because 
the cleanupExecutor
+                // is single-threaded
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Removing deleted ledgers from db indexes");
+                    }
+
+                    entryLocationIndex.removeOffsetFromDeletedLedgers();
+                    ledgerIndex.removeDeletedLedgers();
+                } catch (Throwable t) {
+                    log.warn("Failed to cleanup db indexes", t);
+                }
+            });
+
+            lastCheckpoint = thisCheckpoint;
+
+            // Discard all the entry from the write cache, since they're now 
persisted
+            writeCacheBeingFlushed.clear();
+
+            double flushTimeSeconds = MathUtils.elapsedNanos(startTime) / 
(double) TimeUnit.SECONDS.toNanos(1);
+            double flushThroughput = sizeToFlush / 1024.0 / 1024.0 / 
flushTimeSeconds;
+
+            if (log.isDebugEnabled()) {
+                log.debug("Flushing done time {} s -- Written {} MB/s", 
flushTimeSeconds, flushThroughput);
+            }
+
+            recordSuccessfulEvent(flushStats, startTime);
+            flushSizeStats.registerSuccessfulValue(sizeToFlush);
+        } catch (IOException e) {
+            // Leave IOExecption as it is
+            throw e;
+        } catch (RuntimeException e) {
+            // Wrap unchecked exceptions
+            throw new IOException(e);
+        } finally {
+            try {
+                isFlushOngoing.set(false);
+            } finally {
+                flushMutex.unlock();
+            }
+        }
+    }
+
+    /**
+     * Swap the current write cache with the replacement cache.
+     */
+    private void swapWriteCache() {
+        writeCacheMutex.writeLock().lock();
+        try {
+            // First, swap the current write-cache map with an empty one so 
that writes will
+            // go on unaffected. Only a single flush is happening at the same 
time
+            WriteCache tmp = writeCacheBeingFlushed;
+            writeCacheBeingFlushed = writeCache;
+            writeCache = tmp;
+
+            // since the cache is switched, we can allow flush to be triggered
+            hasFlushBeenTriggered.set(false);
+            flushWriteCacheCondition.signalAll();
+        } finally {
+            try {
+                isFlushOngoing.set(true);
+            } finally {
+                writeCacheMutex.writeLock().unlock();
+            }
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        checkpoint(Checkpoint.MAX);
+    }
+
+    @Override
+    public void deleteLedger(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Deleting ledger {}", ledgerId);
+        }
+
+        // Delete entries from this ledger that are still in the write cache
+        writeCacheMutex.readLock().lock();
+        try {
+            writeCache.deleteLedger(ledgerId);
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+
+        entryLocationIndex.delete(ledgerId);
+        ledgerIndex.delete(ledgerId);
+
+        for (int i = 0, size = ledgerDeletionListeners.size(); i < size; i++) {
+            LedgerDeletionListener listener = ledgerDeletionListeners.get(i);
+            listener.ledgerDeleted(ledgerId);
+        }
+    }
+
+    @Override
+    public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long 
lastLedgerId) throws IOException {
+        return ledgerIndex.getActiveLedgersInRange(firstLedgerId, 
lastLedgerId);
+    }
+
+    @Override
+    public void updateEntriesLocations(Iterable<EntryLocation> locations) 
throws IOException {
+        // Trigger a flush to have all the entries being compacted in the db 
storage
+        flush();
+
+        entryLocationIndex.updateLocations(locations);
+    }
+
+    @Override
+    public EntryLogger getEntryLogger() {
+        return entryLogger;
+    }
+
+    @Override
+    public long getLastAddConfirmed(long ledgerId) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Observable waitForLastAddConfirmedUpdate(long ledgerId, long 
previoisLAC, Observer observer)
+            throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuf getExplicitLac(long ledgerId) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void flushEntriesLocationsIndex() throws IOException {
+        // No-op. Location index is already flushed in 
updateEntriesLocations() call
+    }
+
+    /**
+     * Add an already existing ledger to the index.
+     *
+     * <p>This method is only used as a tool to help the migration from
+     * InterleaveLedgerStorage to DbLedgerStorage
+     *
+     * @param ledgerId
+     *            the ledger id
+     * @param entries
+     *            a map of entryId -> location
+     * @return the number of
+     */
+    public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] 
masterKey,
+            Iterable<SortedMap<Long, Long>> entries) throws Exception {
+        LedgerData ledgerData = 
LedgerData.newBuilder().setExists(true).setFenced(isFenced)
+                .setMasterKey(ByteString.copyFrom(masterKey)).build();
+        ledgerIndex.set(ledgerId, ledgerData);
+        AtomicLong numberOfEntries = new AtomicLong();
+
+        // Iterate over all the entries pages
+        Batch batch = entryLocationIndex.newBatch();
+        entries.forEach(map -> {
+            map.forEach((entryId, location) -> {
+                try {
+                    entryLocationIndex.addLocation(batch, ledgerId, entryId, 
location);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+
+                numberOfEntries.incrementAndGet();
+            });
+        });
+
+        batch.flush();
+        batch.close();
+
+        return numberOfEntries.get();
+    }
+
+    @Override
+    public void registerLedgerDeletionListener(LedgerDeletionListener 
listener) {
+        ledgerDeletionListeners.add(listener);
+    }
+
+    public EntryLocationIndex getEntryLocationIndex() {
+        return entryLocationIndex;
+    }
+
+    private void recordSuccessfulEvent(OpStatsLogger logger, long 
startTimeNanos) {
+        logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
+    }
+
+    private void recordFailedEvent(OpStatsLogger logger, long startTimeNanos) {
+        logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(DbLedgerStorage.class);
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
new file mode 100644
index 0000000..53e37a2
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
@@ -0,0 +1,232 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Iterables;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import 
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains an index of the entry locations in the EntryLogger.
+ *
+ * <p>For each ledger multiple entries are stored in the same "record", 
represented
+ * by the {@link LedgerIndexPage} class.
+ */
+public class EntryLocationIndex implements Closeable {
+
+    private final KeyValueStorage locationsDb;
+    private final ConcurrentLongHashSet deletedLedgers = new 
ConcurrentLongHashSet();
+
+    private StatsLogger stats;
+
+    public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory 
storageFactory, String basePath,
+            StatsLogger stats) throws IOException {
+        String locationsDbPath = FileSystems.getDefault().getPath(basePath, 
"locations").toFile().toString();
+        locationsDb = storageFactory.newKeyValueStorage(locationsDbPath, 
DbConfigType.Huge, conf);
+
+        this.stats = stats;
+        registerStats();
+    }
+
+    public void registerStats() {
+        stats.registerGauge("entries-count", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                try {
+                    return locationsDb.count();
+                } catch (IOException e) {
+                    return -1L;
+                }
+            }
+        });
+    }
+
+    @Override
+    public void close() throws IOException {
+        locationsDb.close();
+    }
+
+    public long getLocation(long ledgerId, long entryId) throws IOException {
+        LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId);
+        LongWrapper value = LongWrapper.get();
+
+        try {
+            if (locationsDb.get(key.array, value.array) < 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Entry not found {}@{} in db index", ledgerId, 
entryId);
+                }
+                return 0;
+            }
+
+            return value.getValue();
+        } finally {
+            key.recycle();
+            value.recycle();
+        }
+    }
+
+    public long getLastEntryInLedger(long ledgerId) throws IOException {
+        if (deletedLedgers.contains(ledgerId)) {
+            // Ledger already deleted
+            return -1;
+        }
+
+        return getLastEntryInLedgerInternal(ledgerId);
+    }
+
+    private long getLastEntryInLedgerInternal(long ledgerId) throws 
IOException {
+        LongPairWrapper maxEntryId = LongPairWrapper.get(ledgerId, 
Long.MAX_VALUE);
+
+        // Search the last entry in storage
+        Entry<byte[], byte[]> entry = locationsDb.getFloor(maxEntryId.array);
+        maxEntryId.recycle();
+
+        if (entry == null) {
+            throw new Bookie.NoEntryException(ledgerId, -1);
+        } else {
+            long foundLedgerId = ArrayUtil.getLong(entry.getKey(), 0);
+            long lastEntryId = ArrayUtil.getLong(entry.getKey(), 8);
+
+            if (foundLedgerId == ledgerId) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Found last page in storage db for ledger {} - 
last entry: {}", ledgerId, lastEntryId);
+                }
+                return lastEntryId;
+            } else {
+                throw new Bookie.NoEntryException(ledgerId, -1);
+            }
+        }
+    }
+
+    public void addLocation(long ledgerId, long entryId, long location) throws 
IOException {
+        Batch batch = locationsDb.newBatch();
+        addLocation(batch, ledgerId, entryId, location);
+        batch.flush();
+        batch.close();
+    }
+
+    public Batch newBatch() {
+        return locationsDb.newBatch();
+    }
+
+    public void addLocation(Batch batch, long ledgerId, long entryId, long 
location) throws IOException {
+        LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId);
+        LongWrapper value = LongWrapper.get(location);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Add location - ledger: {} -- entry: {} -- location: 
{}", ledgerId, entryId, location);
+        }
+
+        try {
+            batch.put(key.array, value.array);
+        } finally {
+            key.recycle();
+            value.recycle();
+        }
+    }
+
+    public void updateLocations(Iterable<EntryLocation> newLocations) throws 
IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Update locations -- {}", Iterables.size(newLocations));
+        }
+
+        Batch batch = newBatch();
+        // Update all the ledger index pages with the new locations
+        for (EntryLocation e : newLocations) {
+            if (log.isDebugEnabled()) {
+                log.debug("Update location - ledger: {} -- entry: {}", 
e.ledger, e.entry);
+            }
+
+            addLocation(batch, e.ledger, e.entry, e.location);
+        }
+
+        batch.flush();
+        batch.close();
+    }
+
+    public void delete(long ledgerId) throws IOException {
+        // We need to find all the LedgerIndexPage records belonging to one 
specific
+        // ledgers
+        deletedLedgers.add(ledgerId);
+    }
+
+    public void removeOffsetFromDeletedLedgers() throws IOException {
+        LongPairWrapper firstKeyWrapper = LongPairWrapper.get(-1, -1);
+        LongPairWrapper lastKeyWrapper = LongPairWrapper.get(-1, -1);
+        LongPairWrapper keyToDelete = LongPairWrapper.get(-1, -1);
+
+        Set<Long> ledgersToDelete = deletedLedgers.items();
+
+        if (ledgersToDelete.isEmpty()) {
+            return;
+        }
+
+        log.info("Deleting indexes for ledgers: {}", ledgersToDelete);
+        Batch batch = locationsDb.newBatch();
+
+        try {
+            for (long ledgerId : ledgersToDelete) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Deleting indexes from ledger {}", ledgerId);
+                }
+
+                firstKeyWrapper.set(ledgerId, 0);
+                lastKeyWrapper.set(ledgerId, Long.MAX_VALUE);
+
+                batch.deleteRange(firstKeyWrapper.array, lastKeyWrapper.array);
+            }
+
+            batch.flush();
+
+            // Removed from pending set
+            for (long ledgerId : ledgersToDelete) {
+                deletedLedgers.remove(ledgerId);
+            }
+        } finally {
+            firstKeyWrapper.recycle();
+            lastKeyWrapper.recycle();
+            keyToDelete.recycle();
+            batch.close();
+        }
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(EntryLocationIndex.class);
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
new file mode 100644
index 0000000..04bf32d
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
@@ -0,0 +1,262 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.protobuf.ByteString;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Arrays;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import 
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
+import 
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.CloseableIterator;
+import 
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains an index for the ledgers metadata.
+ *
+ * <p>The key is the ledgerId and the value is the {@link LedgerData} content.
+ */
+public class LedgerMetadataIndex implements Closeable {
+    // Contains all ledgers stored in the bookie
+    private final ConcurrentLongHashMap<LedgerData> ledgers;
+    private final AtomicInteger ledgersCount;
+
+    private final KeyValueStorage ledgersDb;
+    private StatsLogger stats;
+
+    // Holds ledger modifications applied in memory map, and pending to be 
flushed on db
+    private final ConcurrentLinkedQueue<Entry<Long, LedgerData>> 
pendingLedgersUpdates;
+
+    // Holds ledger ids that were delete from memory map, and pending to be 
flushed on db
+    private final ConcurrentLinkedQueue<Long> pendingDeletedLedgers;
+
+    public LedgerMetadataIndex(ServerConfiguration conf, 
KeyValueStorageFactory storageFactory, String basePath,
+            StatsLogger stats) throws IOException {
+        String ledgersPath = FileSystems.getDefault().getPath(basePath, 
"ledgers").toFile().toString();
+        ledgersDb = storageFactory.newKeyValueStorage(ledgersPath, 
DbConfigType.Small, conf);
+
+        ledgers = new ConcurrentLongHashMap<>();
+        ledgersCount = new AtomicInteger();
+
+        // Read all ledgers from db
+        CloseableIterator<Entry<byte[], byte[]>> iterator = 
ledgersDb.iterator();
+        try {
+            while (iterator.hasNext()) {
+                Entry<byte[], byte[]> entry = iterator.next();
+                long ledgerId = ArrayUtil.getLong(entry.getKey(), 0);
+                LedgerData ledgerData = LedgerData.parseFrom(entry.getValue());
+                ledgers.put(ledgerId, ledgerData);
+                ledgersCount.incrementAndGet();
+            }
+        } finally {
+            iterator.close();
+        }
+
+        this.pendingLedgersUpdates = new ConcurrentLinkedQueue<Entry<Long, 
LedgerData>>();
+        this.pendingDeletedLedgers = new ConcurrentLinkedQueue<Long>();
+
+        this.stats = stats;
+        registerStats();
+    }
+
+    public void registerStats() {
+        stats.registerGauge("ledgers-count", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return (long) ledgersCount.get();
+            }
+        });
+    }
+
+    @Override
+    public void close() throws IOException {
+        ledgersDb.close();
+    }
+
+    public LedgerData get(long ledgerId) throws IOException {
+        LedgerData ledgerData = ledgers.get(ledgerId);
+        if (ledgerData == null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Ledger not found {}", ledgerId);
+            }
+            throw new Bookie.NoLedgerException(ledgerId);
+        }
+
+        return ledgerData;
+    }
+
+    public void set(long ledgerId, LedgerData ledgerData) throws IOException {
+        ledgerData = LedgerData.newBuilder(ledgerData).setExists(true).build();
+
+        if (ledgers.put(ledgerId, ledgerData) == null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Added new ledger {}", ledgerId);
+            }
+            ledgersCount.incrementAndGet();
+        }
+
+        pendingLedgersUpdates.add(new SimpleEntry<Long, LedgerData>(ledgerId, 
ledgerData));
+        pendingDeletedLedgers.remove(ledgerId);
+    }
+
+    public void delete(long ledgerId) throws IOException {
+        if (ledgers.remove(ledgerId) != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Removed ledger {}", ledgerId);
+            }
+            ledgersCount.decrementAndGet();
+        }
+
+        pendingDeletedLedgers.add(ledgerId);
+        pendingLedgersUpdates.removeIf(e -> e.getKey() == ledgerId);
+    }
+
+    public Iterable<Long> getActiveLedgersInRange(final long firstLedgerId, 
final long lastLedgerId)
+            throws IOException {
+        return Iterables.filter(ledgers.keys(), new Predicate<Long>() {
+            @Override
+            public boolean apply(Long ledgerId) {
+                return ledgerId >= firstLedgerId && ledgerId < lastLedgerId;
+            }
+        });
+    }
+
+    public boolean setFenced(long ledgerId) throws IOException {
+        LedgerData ledgerData = get(ledgerId);
+        if (ledgerData.getFenced()) {
+            return false;
+        }
+
+        LedgerData newLedgerData = 
LedgerData.newBuilder(ledgerData).setFenced(true).build();
+
+        if (ledgers.put(ledgerId, newLedgerData) == null) {
+            // Ledger had been deleted
+            if (log.isDebugEnabled()) {
+                log.debug("Re-inserted fenced ledger {}", ledgerId);
+            }
+            ledgersCount.incrementAndGet();
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Set fenced ledger {}", ledgerId);
+            }
+        }
+
+        pendingLedgersUpdates.add(new SimpleEntry<Long, LedgerData>(ledgerId, 
newLedgerData));
+        pendingDeletedLedgers.remove(ledgerId);
+        return true;
+    }
+
+    public void setMasterKey(long ledgerId, byte[] masterKey) throws 
IOException {
+        LedgerData ledgerData = ledgers.get(ledgerId);
+        if (ledgerData == null) {
+            // New ledger inserted
+            ledgerData = 
LedgerData.newBuilder().setExists(true).setFenced(false)
+                    .setMasterKey(ByteString.copyFrom(masterKey)).build();
+            if (log.isDebugEnabled()) {
+                log.debug("Inserting new ledger {}", ledgerId);
+            }
+        } else {
+            byte[] storedMasterKey = ledgerData.getMasterKey().toByteArray();
+            if (ArrayUtil.isArrayAllZeros(storedMasterKey)) {
+                // update master key of the ledger
+                ledgerData = 
LedgerData.newBuilder(ledgerData).setMasterKey(ByteString.copyFrom(masterKey)).build();
+                if (log.isDebugEnabled()) {
+                    log.debug("Replace old master key {} with new master key 
{}", storedMasterKey, masterKey);
+                }
+            } else if (!Arrays.equals(storedMasterKey, masterKey) && 
!ArrayUtil.isArrayAllZeros(masterKey)) {
+                log.warn("Ledger {} masterKey in db can only be set once.", 
ledgerId);
+                throw new 
IOException(BookieException.create(BookieException.Code.IllegalOpException));
+            }
+        }
+
+        if (ledgers.put(ledgerId, ledgerData) == null) {
+            ledgersCount.incrementAndGet();
+        }
+
+        pendingLedgersUpdates.add(new SimpleEntry<Long, LedgerData>(ledgerId, 
ledgerData));
+        pendingDeletedLedgers.remove(ledgerId);
+    }
+
+    /**
+     * Flushes all pending changes.
+     */
+    public void flush() throws IOException {
+        LongWrapper key = LongWrapper.get();
+
+        int updatedLedgers = 0;
+        while (!pendingLedgersUpdates.isEmpty()) {
+            Entry<Long, LedgerData> entry = pendingLedgersUpdates.poll();
+            key.set(entry.getKey());
+            byte[] value = entry.getValue().toByteArray();
+            ledgersDb.put(key.array, value);
+            ++updatedLedgers;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Persisting updates to {} ledgers", updatedLedgers);
+        }
+
+        ledgersDb.sync();
+        key.recycle();
+    }
+
+    public void removeDeletedLedgers() throws IOException {
+        LongWrapper key = LongWrapper.get();
+
+        int deletedLedgers = 0;
+        while (!pendingDeletedLedgers.isEmpty()) {
+            long ledgerId = pendingDeletedLedgers.poll();
+            key.set(ledgerId);
+            ledgersDb.delete(key.array);
+            deletedLedgers++;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Persisting deletes of ledgers {}", deletedLedgers);
+        }
+
+        ledgersDb.sync();
+        key.recycle();
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(LedgerMetadataIndex.class);
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java
new file mode 100644
index 0000000..993297c
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
+/**
+ * Recyclable wrapper that holds a pair of longs.
+ */
+class LongPairWrapper {
+
+    final byte[] array = new byte[16];
+
+    public void set(long first, long second) {
+        ArrayUtil.setLong(array, 0, first);
+        ArrayUtil.setLong(array, 8, second);
+    }
+
+    public long getFirst() {
+        return ArrayUtil.getLong(array, 0);
+    }
+
+    public long getSecond() {
+        return ArrayUtil.getLong(array, 8);
+    }
+
+    public static LongPairWrapper get(long first, long second) {
+        LongPairWrapper lp = RECYCLER.get();
+        ArrayUtil.setLong(lp.array, 0, first);
+        ArrayUtil.setLong(lp.array, 8, second);
+        return lp;
+    }
+
+    public void recycle() {
+        handle.recycle(this);
+    }
+
+    private static final Recycler<LongPairWrapper> RECYCLER = new 
Recycler<LongPairWrapper>() {
+        @Override
+        protected LongPairWrapper newObject(Handle<LongPairWrapper> handle) {
+            return new LongPairWrapper(handle);
+        }
+    };
+
+    private final Handle<LongPairWrapper> handle;
+
+    private LongPairWrapper(Handle<LongPairWrapper> handle) {
+        this.handle = handle;
+    }
+}
\ No newline at end of file
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java
new file mode 100644
index 0000000..144b9ee
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
+/**
+ * Wrapper for a long serialized into a byte array.
+ */
+class LongWrapper {
+
+    final byte[] array = new byte[8];
+
+    public void set(long value) {
+        ArrayUtil.setLong(array, 0, value);
+    }
+
+    public long getValue() {
+        return ArrayUtil.getLong(array, 0);
+    }
+
+    public static LongWrapper get() {
+        return RECYCLER.get();
+    }
+
+    public static LongWrapper get(long value) {
+        LongWrapper lp = RECYCLER.get();
+        ArrayUtil.setLong(lp.array, 0, value);
+        return lp;
+    }
+
+    public void recycle() {
+        handle.recycle(this);
+    }
+
+    private static final Recycler<LongWrapper> RECYCLER = new 
Recycler<LongWrapper>() {
+        @Override
+        protected LongWrapper newObject(Handle<LongWrapper> handle) {
+            return new LongWrapper(handle);
+        }
+    };
+
+    private final Handle<LongWrapper> handle;
+
+    private LongWrapper(Handle<LongWrapper> handle) {
+        this.handle = handle;
+    }
+}
\ No newline at end of file
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java
new file mode 100644
index 0000000..da7f32a
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link DbLedgerStorageBookieTest}.
+ */
+public class DbLedgerStorageBookieTest extends BookKeeperClusterTestCase {
+
+    public DbLedgerStorageBookieTest() {
+        super(1);
+        baseConf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        baseConf.setFlushInterval(60000);
+        baseConf.setGcWaitTime(60000);
+    }
+
+    @Test
+    public void testRecoveryEmptyLedger() throws Exception {
+        LedgerHandle lh1 = bkc.createLedger(1, 1, DigestType.MAC, new byte[0]);
+
+        // Force ledger close & recovery
+        LedgerHandle lh2 = bkc.openLedger(lh1.getId(), DigestType.MAC, new 
byte[0]);
+
+        assertEquals(0, lh2.getLength());
+        assertEquals(-1, lh2.getLastAddConfirmed());
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
new file mode 100644
index 0000000..1e91c2c
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -0,0 +1,423 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link DbLedgerStorage}.
+ */
+public class DbLedgerStorageTest {
+
+    private DbLedgerStorage storage;
+    private File tmpDir;
+
+    @Before
+    public void setup() throws Exception {
+        tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setAllowLoopback(true);
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        Bookie bookie = new Bookie(conf);
+
+        storage = (DbLedgerStorage) bookie.getLedgerStorage();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        storage.shutdown();
+        tmpDir.delete();
+    }
+
+    @Test
+    public void simple() throws Exception {
+        assertEquals(false, storage.ledgerExists(3));
+        try {
+            storage.isFenced(3);
+            fail("should have failed");
+        } catch (Bookie.NoLedgerException nle) {
+            // OK
+        }
+        assertEquals(false, storage.ledgerExists(3));
+        try {
+            storage.setFenced(3);
+            fail("should have failed");
+        } catch (Bookie.NoLedgerException nle) {
+            // OK
+        }
+        storage.setMasterKey(3, "key".getBytes());
+        try {
+            storage.setMasterKey(3, "other-key".getBytes());
+            fail("should have failed");
+        } catch (IOException ioe) {
+            assertTrue(ioe.getCause() instanceof 
BookieException.BookieIllegalOpException);
+        }
+        // setting the same key is NOOP
+        storage.setMasterKey(3, "key".getBytes());
+        assertEquals(true, storage.ledgerExists(3));
+        assertEquals(true, storage.setFenced(3));
+        assertEquals(true, storage.isFenced(3));
+        assertEquals(false, storage.setFenced(3));
+
+        storage.setMasterKey(4, "key".getBytes());
+        assertEquals(false, storage.isFenced(4));
+        assertEquals(true, storage.ledgerExists(4));
+
+        assertEquals("key", new String(storage.readMasterKey(4)));
+
+        assertEquals(Lists.newArrayList(4L, 3L), 
Lists.newArrayList(storage.getActiveLedgersInRange(0, 100)));
+        assertEquals(Lists.newArrayList(4L, 3L), 
Lists.newArrayList(storage.getActiveLedgersInRange(3, 100)));
+        assertEquals(Lists.newArrayList(3L), 
Lists.newArrayList(storage.getActiveLedgersInRange(0, 4)));
+
+        // Add / read entries
+        ByteBuf entry = Unpooled.buffer(1024);
+        entry.writeLong(4); // ledger id
+        entry.writeLong(1); // entry id
+        entry.writeBytes("entry-1".getBytes());
+
+        assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired());
+
+        assertEquals(1, storage.addEntry(entry));
+
+        assertEquals(true, ((DbLedgerStorage) storage).isFlushRequired());
+
+        // Read from write cache
+        ByteBuf res = storage.getEntry(4, 1);
+        assertEquals(entry, res);
+
+        storage.flush();
+
+        assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired());
+
+        // Read from db
+        res = storage.getEntry(4, 1);
+        assertEquals(entry, res);
+
+        try {
+            storage.getEntry(4, 2);
+            fail("Should have thrown exception");
+        } catch (NoEntryException e) {
+            // ok
+        }
+
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(4); // ledger id
+        entry2.writeLong(2); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        storage.addEntry(entry2);
+
+        // Read last entry in ledger
+        res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
+        assertEquals(entry2, res);
+
+        ByteBuf entry3 = Unpooled.buffer(1024);
+        entry3.writeLong(4); // ledger id
+        entry3.writeLong(3); // entry id
+        entry3.writeBytes("entry-3".getBytes());
+        storage.addEntry(entry3);
+
+        ByteBuf entry4 = Unpooled.buffer(1024);
+        entry4.writeLong(4); // ledger id
+        entry4.writeLong(4); // entry id
+        entry4.writeBytes("entry-4".getBytes());
+        storage.addEntry(entry4);
+
+        res = storage.getEntry(4, 4);
+        assertEquals(entry4, res);
+
+        // Delete
+        assertEquals(true, storage.ledgerExists(4));
+        storage.deleteLedger(4);
+        assertEquals(false, storage.ledgerExists(4));
+
+        // Should not throw exception event if the ledger was deleted
+        storage.getEntry(4, 4);
+
+        storage.addEntry(Unpooled.wrappedBuffer(entry2));
+        res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
+        assertEquals(entry4, res);
+
+        // Get last entry from storage
+        storage.flush();
+
+        try {
+            storage.getEntry(4, 4);
+            fail("Should have thrown exception since the ledger was deleted");
+        } catch (NoEntryException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void testBookieCompaction() throws Exception {
+        storage.setMasterKey(4, "key".getBytes());
+
+        ByteBuf entry3 = Unpooled.buffer(1024);
+        entry3.writeLong(4); // ledger id
+        entry3.writeLong(3); // entry id
+        entry3.writeBytes("entry-3".getBytes());
+        storage.addEntry(entry3);
+
+        // Simulate bookie compaction
+        EntryLogger entryLogger = ((DbLedgerStorage) storage).getEntryLogger();
+        // Rewrite entry-3
+        ByteBuf newEntry3 = Unpooled.buffer(1024);
+        newEntry3.writeLong(4); // ledger id
+        newEntry3.writeLong(3); // entry id
+        newEntry3.writeBytes("new-entry-3".getBytes());
+        long location = entryLogger.addEntry(4, newEntry3, false);
+
+        List<EntryLocation> locations = Lists.newArrayList(new 
EntryLocation(4, 3, location));
+        storage.updateEntriesLocations(locations);
+
+        ByteBuf res = storage.getEntry(4, 3);
+        System.out.println("res:       " + ByteBufUtil.hexDump(res));
+        System.out.println("newEntry3: " + ByteBufUtil.hexDump(newEntry3));
+        assertEquals(newEntry3, res);
+    }
+
+    @Test
+    public void doubleDirectoryError() throws Exception {
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setAllowLoopback(true);
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setLedgerDirNames(new String[] { "dir1", "dir2" });
+
+        try {
+            new Bookie(conf);
+            fail("Should have failed because of the 2 directories");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+    }
+
+    @Test
+    public void testRewritingEntries() throws Exception {
+        storage.setMasterKey(1, "key".getBytes());
+
+        try {
+            storage.getEntry(1, -1);
+            fail("Should throw exception");
+        } catch (Bookie.NoEntryException e) {
+            // ok
+        }
+
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry1);
+        storage.flush();
+
+        ByteBuf newEntry1 = Unpooled.buffer(1024);
+        newEntry1.writeLong(1); // ledger id
+        newEntry1.writeLong(1); // entry id
+        newEntry1.writeBytes("new-entry-1".getBytes());
+
+        storage.addEntry(newEntry1);
+        storage.flush();
+
+        ByteBuf response = storage.getEntry(1, 1);
+        assertEquals(newEntry1, response);
+    }
+
+    @Test
+    public void testEntriesOutOfOrder() throws Exception {
+        storage.setMasterKey(1, "key".getBytes());
+
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(1); // ledger id
+        entry2.writeLong(2); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        storage.addEntry(entry2);
+
+        try {
+            storage.getEntry(1, 1);
+            fail("Entry doesn't exist");
+        } catch (NoEntryException e) {
+            // Ok, entry doesn't exist
+        }
+
+        ByteBuf res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry1);
+
+        res = storage.getEntry(1, 1);
+        assertEquals(entry1, res);
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+
+        storage.flush();
+
+        res = storage.getEntry(1, 1);
+        assertEquals(entry1, res);
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+    }
+
+    @Test
+    public void testEntriesOutOfOrderWithFlush() throws Exception {
+        storage.setMasterKey(1, "key".getBytes());
+
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(1); // ledger id
+        entry2.writeLong(2); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        storage.addEntry(entry2);
+
+        try {
+            storage.getEntry(1, 1);
+            fail("Entry doesn't exist");
+        } catch (NoEntryException e) {
+            // Ok, entry doesn't exist
+        }
+
+        ByteBuf res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+        res.release();
+
+        storage.flush();
+
+        try {
+            storage.getEntry(1, 1);
+            fail("Entry doesn't exist");
+        } catch (NoEntryException e) {
+            // Ok, entry doesn't exist
+        }
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+        res.release();
+
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry1);
+
+        res = storage.getEntry(1, 1);
+        assertEquals(entry1, res);
+        res.release();
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+        res.release();
+
+        storage.flush();
+
+        res = storage.getEntry(1, 1);
+        assertEquals(entry1, res);
+        res.release();
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+        res.release();
+    }
+
+    @Test
+    public void testAddEntriesAfterDelete() throws Exception {
+        storage.setMasterKey(1, "key".getBytes());
+
+        ByteBuf entry0 = Unpooled.buffer(1024);
+        entry0.writeLong(1); // ledger id
+        entry0.writeLong(0); // entry id
+        entry0.writeBytes("entry-0".getBytes());
+
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry0);
+        storage.addEntry(entry1);
+
+        storage.flush();
+
+        storage.deleteLedger(1);
+
+        storage.setMasterKey(1, "key".getBytes());
+
+        entry0 = Unpooled.buffer(1024);
+        entry0.writeLong(1); // ledger id
+        entry0.writeLong(0); // entry id
+        entry0.writeBytes("entry-0".getBytes());
+
+        entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry0);
+        storage.addEntry(entry1);
+
+        assertEquals(entry0, storage.getEntry(1, 0));
+        assertEquals(entry1, storage.getEntry(1, 1));
+
+        storage.flush();
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
new file mode 100644
index 0000000..0d3f5bb
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link DbLedgerStorage}.
+ */
+public class DbLedgerStorageWriteCacheTest {
+
+    private DbLedgerStorage storage;
+    private File tmpDir;
+
+    private static class MockedDbLedgerStorage extends DbLedgerStorage {
+
+        @Override
+        public void flush() throws IOException {
+            flushMutex.lock();
+            try {
+                // Swap the write caches and block indefinitely to simulate a 
slow disk
+                WriteCache tmp = writeCacheBeingFlushed;
+                writeCacheBeingFlushed = writeCache;
+                writeCache = tmp;
+
+                // since the cache is switched, we can allow flush to be 
triggered
+                hasFlushBeenTriggered.set(false);
+
+                // Block the flushing thread
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    return;
+                }
+            } finally {
+                flushMutex.unlock();
+            }
+        }
+
+    }
+
+    @Before
+    public void setup() throws Exception {
+        tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setAllowLoopback(true);
+        conf.setLedgerStorageClass(MockedDbLedgerStorage.class.getName());
+        conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 1);
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        Bookie bookie = new Bookie(conf);
+
+        storage = (DbLedgerStorage) bookie.getLedgerStorage();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        storage.shutdown();
+        tmpDir.delete();
+    }
+
+    @Test
+    public void writeCacheFull() throws Exception {
+        storage.setMasterKey(4, "key".getBytes());
+        assertEquals(false, storage.isFenced(4));
+        assertEquals(true, storage.ledgerExists(4));
+
+        assertEquals("key", new String(storage.readMasterKey(4)));
+
+        // Add enough entries to fill the 1st write cache
+        for (int i = 0; i < 5; i++) {
+            ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8);
+            entry.writeLong(4); // ledger id
+            entry.writeLong(i); // entry id
+            entry.writeZero(100 * 1024);
+            storage.addEntry(entry);
+        }
+
+        for (int i = 0; i < 5; i++) {
+            ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8);
+            entry.writeLong(4); // ledger id
+            entry.writeLong(5 + i); // entry id
+            entry.writeZero(100 * 1024);
+            storage.addEntry(entry);
+        }
+
+        // Next add should fail for cache full
+        ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8);
+        entry.writeLong(4); // ledger id
+        entry.writeLong(22); // entry id
+        entry.writeZero(100 * 1024);
+
+        try {
+            storage.addEntry(entry);
+            fail("Should have thrown exception");
+        } catch (IOException e) {
+            // Expected
+        }
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java
new file mode 100644
index 0000000..6e83ffd
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link EntryLocationIndex}.
+ */
+public class EntryLocationIndexTest {
+
+    private final ServerConfiguration serverConfiguration = new 
ServerConfiguration();
+
+    @Test
+    public void deleteLedgerTest() throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        tmpDir.deleteOnExit();
+
+        EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, 
KeyValueStorageRocksDB.factory,
+                tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE);
+
+        // Add some dummy indexes
+        idx.addLocation(40312, 0, 1);
+        idx.addLocation(40313, 10, 2);
+        idx.addLocation(40320, 0, 3);
+
+        // Add more indexes in a different batch
+        idx.addLocation(40313, 11, 5);
+        idx.addLocation(40313, 12, 6);
+        idx.addLocation(40320, 1, 7);
+        idx.addLocation(40312, 3, 4);
+
+        idx.delete(40313);
+
+        assertEquals(1, idx.getLocation(40312, 0));
+        assertEquals(4, idx.getLocation(40312, 3));
+        assertEquals(3, idx.getLocation(40320, 0));
+        assertEquals(7, idx.getLocation(40320, 1));
+
+        assertEquals(2, idx.getLocation(40313, 10));
+        assertEquals(5, idx.getLocation(40313, 11));
+        assertEquals(6, idx.getLocation(40313, 12));
+
+        idx.removeOffsetFromDeletedLedgers();
+
+        // After flush the keys will be removed
+        assertEquals(0, idx.getLocation(40313, 10));
+        assertEquals(0, idx.getLocation(40313, 11));
+        assertEquals(0, idx.getLocation(40313, 12));
+
+        idx.close();
+    }
+
+    // this tests if a ledger is added after it has been deleted
+    @Test
+    public void addLedgerAfterDeleteTest() throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        tmpDir.deleteOnExit();
+
+        EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, 
KeyValueStorageRocksDB.factory,
+                tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE);
+
+        // Add some dummy indexes
+        idx.addLocation(40312, 0, 1);
+        idx.addLocation(40313, 10, 2);
+        idx.addLocation(40320, 0, 3);
+
+        idx.delete(40313);
+
+        // Add more indexes in a different batch
+        idx.addLocation(40313, 11, 5);
+        idx.addLocation(40313, 12, 6);
+        idx.addLocation(40320, 1, 7);
+        idx.addLocation(40312, 3, 4);
+
+        idx.removeOffsetFromDeletedLedgers();
+
+        assertEquals(0, idx.getLocation(40313, 11));
+        assertEquals(0, idx.getLocation(40313, 12));
+
+        idx.close();
+    }
+}
diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml 
b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
index 2cc5ce7..736c88b 100644
--- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
+++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
@@ -26,6 +26,10 @@
   </Match>
   <Match>
     <!-- generated code, we can't be held responsible for findbugs in it //-->
+    <Class 
name="~org\.apache\.bookkeeper\.bookie\.storage\.ldb\.DbLedgerStorageDataFormats.*"
 />
+  </Match>
+  <Match>
+    <!-- generated code, we can't be held responsible for findbugs in it //-->
     <Class name="~org\.apache\.bookkeeper\.tests\.generated.*" />
   </Match>
   <Match>

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to