sijie closed pull request #1289: Allow multiple directories in DbLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1289
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index d5f6c8dcd..978eb83e4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -76,7 +76,6 @@
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
-import org.apache.bookkeeper.bookie.storage.ldb.EntryLocationIndex;
 import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.MetaStoreException;
@@ -2632,8 +2631,6 @@ public void start() {
                     null, checkpointSource, checkpointer, 
NullStatsLogger.INSTANCE);
             LedgerCache interleavedLedgerCache = 
interleavedStorage.ledgerCache;
 
-            EntryLocationIndex dbEntryLocationIndex = 
dbStorage.getEntryLocationIndex();
-
             int convertedLedgers = 0;
             for (long ledgerId : dbStorage.getActiveLedgersInRange(0, 
Long.MAX_VALUE)) {
                 if (LOG.isDebugEnabled()) {
@@ -2645,10 +2642,10 @@ public void start() {
                     interleavedStorage.setFenced(ledgerId);
                 }
 
-                long lastEntryInLedger = 
dbEntryLocationIndex.getLastEntryInLedger(ledgerId);
+                long lastEntryInLedger = 
dbStorage.getLastEntryInLedger(ledgerId);
                 for (long entryId = 0; entryId <= lastEntryInLedger; 
entryId++) {
                     try {
-                        long location = 
dbEntryLocationIndex.getLocation(ledgerId, entryId);
+                        long location = dbStorage.getLocation(ledgerId, 
entryId);
                         if (location != 0L) {
                             interleavedLedgerCache.putEntryOffset(ledgerId, 
entryId, location);
                         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 79515ea4b..4b1c83444 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -125,6 +125,18 @@
 
     final ServerConfiguration conf;
 
+    /**
+     * Create a garbage collector thread.
+     *
+     * @param conf
+     *          Server Configuration Object.
+     * @throws IOException
+     */
+    public GarbageCollectorThread(ServerConfiguration conf, LedgerManager 
ledgerManager,
+            final CompactableLedgerStorage ledgerStorage, StatsLogger 
statsLogger) throws IOException {
+        this(conf, ledgerManager, ledgerStorage, statsLogger,
+                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("GarbageCollectorThread")));
+    }
 
     /**
      * Create a garbage collector thread.
@@ -136,9 +148,10 @@
     public GarbageCollectorThread(ServerConfiguration conf,
                                   LedgerManager ledgerManager,
                                   final CompactableLedgerStorage ledgerStorage,
-                                  StatsLogger statsLogger)
+                                  StatsLogger statsLogger,
+                                    ScheduledExecutorService gcExecutor)
         throws IOException {
-        gcExecutor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("GarbageCollectorThread"));
+        this.gcExecutor = gcExecutor;
         this.conf = conf;
 
         this.entryLogger = ledgerStorage.getEntryLogger();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
index 4bd28b2fb..12139bf0e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
@@ -62,8 +62,7 @@ public LedgerDirsManager(ServerConfiguration conf, File[] 
dirs, DiskChecker disk
         this(conf, dirs, diskChecker, NullStatsLogger.INSTANCE);
     }
 
-    @VisibleForTesting
-    LedgerDirsManager(ServerConfiguration conf, File[] dirs, DiskChecker 
diskChecker, StatsLogger statsLogger) {
+    public LedgerDirsManager(ServerConfiguration conf, File[] dirs, 
DiskChecker diskChecker, StatsLogger statsLogger) {
         this.ledgerDirectories = Arrays.asList(Bookie
                 .getCurrentDirectories(dirs));
         this.writableLedgerDirectories = new 
ArrayList<File>(ledgerDirectories);
@@ -352,6 +351,10 @@ public void addLedgerDirsListener(LedgerDirsListener 
listener) {
         }
     }
 
+    public DiskChecker getDiskChecker() {
+        return diskChecker;
+    }
+
     /**
      * Indicates All configured ledger directories are full.
      */
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 55b094587..875336369 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -20,323 +20,114 @@
  */
 package org.apache.bookkeeper.bookie.storage.ldb;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static 
org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
+import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.SortedMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.StampedLock;
 
-import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.Checkpointer;
-import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
-import org.apache.bookkeeper.bookie.EntryLocation;
-import org.apache.bookkeeper.bookie.EntryLogger;
-import org.apache.bookkeeper.bookie.GarbageCollectorThread;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.LedgerStorage;
 import org.apache.bookkeeper.bookie.StateManager;
-import 
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
-import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
 import 
org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
-import org.apache.bookkeeper.common.util.Watchable;
+import 
org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage.LedgerLoggerProcessor;
+import org.apache.bookkeeper.common.util.MathUtils;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+
 
 /**
  * Implementation of LedgerStorage that uses RocksDB to keep the indexes for 
entries stored in EntryLogs.
  */
-public class DbLedgerStorage implements CompactableLedgerStorage {
-
-    private static final long NOT_ASSIGNED_LAC = Long.MIN_VALUE;
-
-    /**
-     * This class borrows the logic from FileInfo.
-     *
-     * <p>This class is used for holding all the transient states for a given 
ledger.
-     */
-    private static class TransientLedgerInfo extends 
Watchable<LastAddConfirmedUpdateNotification>
-            implements AutoCloseable {
-
-        // lac
-        private volatile long lac = NOT_ASSIGNED_LAC;
-        // request from explicit lac requests
-        private ByteBuffer explicitLac = null;
-        // is the ledger info closed?
-        private boolean isClosed;
-
-        private final long ledgerId;
-        // reference to LedgerMetadataIndex
-        private final LedgerMetadataIndex ledgerIndex;
-
-        private long lastAccessed;
-
-        /**
-         * Construct an Watchable with zero watchers.
-         */
-        public TransientLedgerInfo(long ledgerId, LedgerMetadataIndex 
ledgerIndex) {
-            super(WATCHER_RECYCLER);
-            this.ledgerId = ledgerId;
-            this.ledgerIndex = ledgerIndex;
-            this.lastAccessed = System.currentTimeMillis();
-        }
-
-        long getLastAddConfirmed() {
-            return lac;
-        }
-
-        long setLastAddConfirmed(long lac) {
-            long lacToReturn;
-            boolean changed = false;
-            synchronized (this) {
-                if (this.lac == NOT_ASSIGNED_LAC || this.lac < lac) {
-                    this.lac = lac;
-                    changed = true;
-                    lastAccessed = System.currentTimeMillis();
-                }
-                lacToReturn = this.lac;
-            }
-            if (changed) {
-                notifyWatchers(lacToReturn);
-            }
-            return lacToReturn;
-        }
-
-        synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
-                Watcher<LastAddConfirmedUpdateNotification> watcher) throws 
IOException {
-            lastAccessed = System.currentTimeMillis();
-            if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed || 
ledgerIndex.get(ledgerId).getFenced()) {
-                return false;
-            }
-
-            addWatcher(watcher);
-            return true;
-        }
-
-        public ByteBuf getExplicitLac() {
-            ByteBuf retLac = null;
-            synchronized (this) {
-                if (explicitLac != null) {
-                    retLac = Unpooled.buffer(explicitLac.capacity());
-                    explicitLac.rewind(); // copy from the beginning
-                    retLac.writeBytes(explicitLac);
-                    explicitLac.rewind();
-                    return retLac;
-                }
-            }
-            return retLac;
-        }
-
-        public void setExplicitLac(ByteBuf lac) {
-            long explicitLacValue;
-            synchronized (this) {
-                if (explicitLac == null) {
-                    explicitLac = ByteBuffer.allocate(lac.capacity());
-                }
-                lac.readBytes(explicitLac);
-                explicitLac.rewind();
-
-                // skip the ledger id
-                explicitLac.getLong();
-                explicitLacValue = explicitLac.getLong();
-                explicitLac.rewind();
-
-                lastAccessed = System.currentTimeMillis();
-            }
-            setLastAddConfirmed(explicitLacValue);
-        }
-
-        boolean isStale() {
-            return (lastAccessed + 
TimeUnit.MINUTES.toMillis(LEDGER_INFO_CACHING_TIME_MINUTES)) < System
-                    .currentTimeMillis();
-        }
-
-        void notifyWatchers(long lastAddConfirmed) {
-            notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, 
lastAddConfirmed);
-        }
-
-        @Override
-        public void close() {
-            synchronized (this) {
-                if (isClosed) {
-                    return;
-                }
-                isClosed = true;
-            }
-            // notify watchers
-            notifyWatchers(Long.MAX_VALUE);
-        }
-
-    }
-
-    private EntryLogger entryLogger;
-
-    private LedgerMetadataIndex ledgerIndex;
-    private EntryLocationIndex entryLocationIndex;
-
-    private static final long LEDGER_INFO_CACHING_TIME_MINUTES = 10;
-    private ConcurrentLongHashMap<TransientLedgerInfo> 
transientLedgerInfoCache;
-
-    private GarbageCollectorThread gcThread;
-
-    // Write cache where all new entries are inserted into
-    protected volatile WriteCache writeCache;
-
-    // Write cache that is used to swap with writeCache during flushes
-    protected volatile WriteCache writeCacheBeingFlushed;
-
-    // Cache where we insert entries for speculative reading
-    private ReadCache readCache;
-
-    private final StampedLock writeCacheRotationLock = new StampedLock();
-
-    protected final ReentrantLock flushMutex = new ReentrantLock();
-
-    protected final AtomicBoolean hasFlushBeenTriggered = new 
AtomicBoolean(false);
-    private final AtomicBoolean isFlushOngoing = new AtomicBoolean(false);
-
-    private final ExecutorService executor = 
Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage"));
-
-    // Executor used to for db index cleanup
-    private final ScheduledExecutorService cleanupExecutor = Executors
-            .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("db-storage-cleanup"));
+@Slf4j
+public class DbLedgerStorage implements LedgerStorage {
 
     static final String WRITE_CACHE_MAX_SIZE_MB = 
"dbStorage_writeCacheMaxSizeMb";
-    static final String READ_AHEAD_CACHE_BATCH_SIZE = 
"dbStorage_readAheadCacheBatchSize";
+
     static final String READ_AHEAD_CACHE_MAX_SIZE_MB = 
"dbStorage_readAheadCacheMaxSizeMb";
 
     static final String MAX_THROTTLE_TIME_MILLIS = 
"dbStorage_maxThrottleTimeMs";
 
     private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16;
     private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16;
-    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
-
-    private static final long DEFAUL_MAX_THROTTLE_TIME_MILLIS = 
TimeUnit.SECONDS.toMillis(10);
 
     private static final int MB = 1024 * 1024;
 
-    private final CopyOnWriteArrayList<LedgerDeletionListener> 
ledgerDeletionListeners = Lists
-            .newCopyOnWriteArrayList();
-
-    private long writeCacheMaxSize;
-
-    private CheckpointSource checkpointSource = null;
-    private Checkpoint lastCheckpoint = Checkpoint.MIN;
-
-    private long readCacheMaxSize;
-    private int readAheadCacheBatchSize;
-
-    private long maxThrottleTimeNanos;
-
-    private StatsLogger stats;
+    private int numberOfDirs;
+    private List<SingleDirectoryDbLedgerStorage> ledgerStorageList;
 
-    private OpStatsLogger addEntryStats;
-    private OpStatsLogger readEntryStats;
-    private OpStatsLogger readCacheHitStats;
-    private OpStatsLogger readCacheMissStats;
-    private OpStatsLogger readAheadBatchCountStats;
-    private OpStatsLogger readAheadBatchSizeStats;
-    private OpStatsLogger flushStats;
-    private OpStatsLogger flushSizeStats;
-
-    private Counter throttledWriteRequests;
-    private Counter rejectedWriteRequests;
+    // Keep 1 single Bookie GC thread so the the compactions from multiple 
individual directories are serialized
+    private ScheduledExecutorService gcExecutor;
 
     @Override
     public void initialize(ServerConfiguration conf, LedgerManager 
ledgerManager, LedgerDirsManager ledgerDirsManager,
             LedgerDirsManager indexDirsManager, StateManager stateManager, 
CheckpointSource checkpointSource,
             Checkpointer checkpointer, StatsLogger statsLogger) throws 
IOException {
-        checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
-                "Db implementation only allows for one storage dir");
-
-        String baseDir = 
ledgerDirsManager.getAllLedgerDirs().get(0).toString();
-
-        writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, 
DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
-
-        writeCache = new WriteCache(writeCacheMaxSize / 2);
-        writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
-
-        this.checkpointSource = checkpointSource;
-
-        readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, 
DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
-        readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, 
DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
-
-        long maxThrottleTimeMillis = conf.getLong(MAX_THROTTLE_TIME_MILLIS, 
DEFAUL_MAX_THROTTLE_TIME_MILLIS);
-        maxThrottleTimeNanos = 
TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis);
-
-        readCache = new ReadCache(readCacheMaxSize);
+        long writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, 
DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
+        long readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, 
DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
 
-        this.stats = statsLogger;
+        this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size();
 
         log.info("Started Db Ledger Storage");
+        log.info(" - Number of directories: {}", numberOfDirs);
         log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB);
         log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);
-        log.info(" - Read Ahead Batch size: : {}", readAheadCacheBatchSize);
 
-        ledgerIndex = new LedgerMetadataIndex(conf, 
KeyValueStorageRocksDB.factory, baseDir, stats);
-        entryLocationIndex = new EntryLocationIndex(conf, 
KeyValueStorageRocksDB.factory, baseDir, stats);
+        long perDirectoryWriteCacheSize = writeCacheMaxSize / numberOfDirs;
+        long perDirectoryReadCacheSize = readCacheMaxSize / numberOfDirs;
 
-        transientLedgerInfoCache = new ConcurrentLongHashMap<>(16 * 1024,
-                Runtime.getRuntime().availableProcessors() * 2);
-        
cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo, 
LEDGER_INFO_CACHING_TIME_MINUTES,
-                LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
+        gcExecutor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("GarbageCollector"));
 
-        entryLogger = new EntryLogger(conf, ledgerDirsManager);
-        gcThread = new GarbageCollectorThread(conf, ledgerManager, this, 
statsLogger);
+        ledgerStorageList = Lists.newArrayList();
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            // Create a ledger dirs manager for the single directory
+            File[] dirs = new File[1];
+            // Remove the `/current` suffix which will be appended again by 
LedgersDirManager
+            dirs[0] = ledgerDir.getParentFile();
+            LedgerDirsManager ldm = new LedgerDirsManager(conf, dirs, 
ledgerDirsManager.getDiskChecker(), statsLogger);
+            ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, 
ledgerManager, ldm, indexDirsManager,
+                    stateManager, checkpointSource, checkpointer, statsLogger, 
gcExecutor, perDirectoryWriteCacheSize,
+                    perDirectoryReadCacheSize));
+        }
 
-        registerStats();
+        registerStats(statsLogger);
     }
 
-    /**
-     * Evict all the ledger info object that were not used recently.
-     */
-    private void cleanupStaleTransientLedgerInfo() {
-        transientLedgerInfoCache.removeIf((ledgerId, ledgerInfo) -> {
-            boolean isStale = ledgerInfo.isStale();
-            if (isStale) {
-                ledgerInfo.close();
-            }
-
-            return isStale;
-        });
+    @VisibleForTesting
+    protected SingleDirectoryDbLedgerStorage 
newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
+            LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, 
LedgerDirsManager indexDirsManager,
+            StateManager stateManager, CheckpointSource checkpointSource, 
Checkpointer checkpointer,
+            StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long 
writeCacheSize, long readCacheSize)
+            throws IOException {
+        return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, 
ledgerDirsManager, indexDirsManager,
+                stateManager, checkpointSource, checkpointer, statsLogger, 
gcExecutor, writeCacheSize, readCacheSize);
     }
 
-    public void registerStats() {
+    public void registerStats(StatsLogger stats) {
         stats.registerGauge("write-cache-size", new Gauge<Long>() {
             @Override
             public Long getDefaultValue() {
@@ -345,7 +136,7 @@ public Long getDefaultValue() {
 
             @Override
             public Long getSample() {
-                return writeCache.size() + writeCacheBeingFlushed.size();
+                return 
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheSize).sum();
             }
         });
         stats.registerGauge("write-cache-count", new Gauge<Long>() {
@@ -356,7 +147,7 @@ public Long getDefaultValue() {
 
             @Override
             public Long getSample() {
-                return writeCache.count() + writeCacheBeingFlushed.count();
+                return 
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheCount).sum();
             }
         });
         stats.registerGauge("read-cache-size", new Gauge<Long>() {
@@ -367,7 +158,7 @@ public Long getDefaultValue() {
 
             @Override
             public Long getSample() {
-                return readCache.size();
+                return 
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheSize).sum();
             }
         });
         stats.registerGauge("read-cache-count", new Gauge<Long>() {
@@ -378,666 +169,142 @@ public Long getDefaultValue() {
 
             @Override
             public Long getSample() {
-                return readCache.count();
+                return 
ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheCount).sum();
             }
         });
-
-        addEntryStats = stats.getOpStatsLogger("add-entry");
-        readEntryStats = stats.getOpStatsLogger("read-entry");
-        readCacheHitStats = stats.getOpStatsLogger("read-cache-hits");
-        readCacheMissStats = stats.getOpStatsLogger("read-cache-misses");
-        readAheadBatchCountStats = 
stats.getOpStatsLogger("readahead-batch-count");
-        readAheadBatchSizeStats = 
stats.getOpStatsLogger("readahead-batch-size");
-        flushStats = stats.getOpStatsLogger("flush");
-        flushSizeStats = stats.getOpStatsLogger("flush-size");
-
-        throttledWriteRequests = stats.getCounter("throttled-write-requests");
-        rejectedWriteRequests = stats.getCounter("rejected-write-requests");
     }
 
     @Override
     public void start() {
-        gcThread.start();
+        ledgerStorageList.forEach(LedgerStorage::start);
     }
 
     @Override
     public void shutdown() throws InterruptedException {
-        try {
-            flush();
-
-            gcThread.shutdown();
-            entryLogger.shutdown();
-
-            cleanupExecutor.shutdown();
-            cleanupExecutor.awaitTermination(1, TimeUnit.SECONDS);
-
-            ledgerIndex.close();
-            entryLocationIndex.close();
-
-            writeCache.close();
-            writeCacheBeingFlushed.close();
-            readCache.close();
-            executor.shutdown();
-
-        } catch (IOException e) {
-            log.error("Error closing db storage", e);
+        for (LedgerStorage ls : ledgerStorageList) {
+            ls.shutdown();
         }
     }
 
     @Override
     public boolean ledgerExists(long ledgerId) throws IOException {
-        try {
-            LedgerData ledgerData = ledgerIndex.get(ledgerId);
-            if (log.isDebugEnabled()) {
-                log.debug("Ledger exists. ledger: {} : {}", ledgerId, 
ledgerData.getExists());
-            }
-            return ledgerData.getExists();
-        } catch (Bookie.NoLedgerException nle) {
-            // ledger does not exist
-            return false;
-        }
+        return getLedgerSorage(ledgerId).ledgerExists(ledgerId);
     }
 
     @Override
-    public boolean isFenced(long ledgerId) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("isFenced. ledger: {}", ledgerId);
-        }
-        return ledgerIndex.get(ledgerId).getFenced();
+    public boolean setFenced(long ledgerId) throws IOException {
+        return getLedgerSorage(ledgerId).setFenced(ledgerId);
     }
 
     @Override
-    public boolean setFenced(long ledgerId) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("Set fenced. ledger: {}", ledgerId);
-        }
-        boolean changed = ledgerIndex.setFenced(ledgerId);
-        if (changed) {
-            // notify all the watchers if a ledger is fenced
-            TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.get(ledgerId);
-            if (null != ledgerInfo) {
-                ledgerInfo.notifyWatchers(Long.MAX_VALUE);
-            }
-        }
-        return changed;
+    public boolean isFenced(long ledgerId) throws IOException {
+        return getLedgerSorage(ledgerId).isFenced(ledgerId);
     }
 
     @Override
     public void setMasterKey(long ledgerId, byte[] masterKey) throws 
IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("Set master key. ledger: {}", ledgerId);
-        }
-        ledgerIndex.setMasterKey(ledgerId, masterKey);
+        getLedgerSorage(ledgerId).setMasterKey(ledgerId, masterKey);
     }
 
     @Override
     public byte[] readMasterKey(long ledgerId) throws IOException, 
BookieException {
-        if (log.isDebugEnabled()) {
-            log.debug("Read master key. ledger: {}", ledgerId);
-        }
-        return ledgerIndex.get(ledgerId).getMasterKey().toByteArray();
+        return getLedgerSorage(ledgerId).readMasterKey(ledgerId);
     }
 
     @Override
     public long addEntry(ByteBuf entry) throws IOException, BookieException {
-        long startTime = MathUtils.nowInNano();
-
         long ledgerId = entry.getLong(entry.readerIndex());
-        long entryId = entry.getLong(entry.readerIndex() + 8);
-        long lac = entry.getLong(entry.readerIndex() + 16);
-
-        if (log.isDebugEnabled()) {
-            log.debug("Add entry. {}@{}, lac = {}", ledgerId, entryId, lac);
-        }
-
-        // First we try to do an optimistic locking to get access to the 
current write cache.
-        // This is based on the fact that the write cache is only being 
rotated (swapped) every 1 minute. During the
-        // rest of the time, we can have multiple thread using the optimistic 
lock here without interfering.
-        long stamp = writeCacheRotationLock.tryOptimisticRead();
-        boolean inserted = false;
-
-        inserted = writeCache.put(ledgerId, entryId, entry);
-        if (!writeCacheRotationLock.validate(stamp)) {
-            // The write cache was rotated while we were inserting. We need to 
acquire the proper read lock and repeat
-            // the operation because we might have inserted in a write cache 
that was already being flushed and cleared,
-            // without being sure about this last entry being flushed or not.
-            stamp = writeCacheRotationLock.readLock();
-            try {
-                inserted = writeCache.put(ledgerId, entryId, entry);
-            } finally {
-                 writeCacheRotationLock.unlockRead(stamp);
-            }
-        }
-
-        if (!inserted) {
-            triggerFlushAndAddEntry(ledgerId, entryId, entry);
-        }
-
-        // after successfully insert the entry, update LAC and notify the 
watchers
-        updateCachedLacIfNeeded(ledgerId, lac);
-
-        recordSuccessfulEvent(addEntryStats, startTime);
-        return entryId;
-    }
-
-    private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf 
entry)
-            throws IOException, BookieException {
-        // Write cache is full, we need to trigger a flush so that it gets 
rotated
-        // If the flush has already been triggered or flush has already 
switched the
-        // cache, we don't need to trigger another flush
-        if (!isFlushOngoing.get() && 
hasFlushBeenTriggered.compareAndSet(false, true)) {
-            // Trigger an early flush in background
-            log.info("Write cache is full, triggering flush");
-            executor.execute(() -> {
-                try {
-                    flush();
-                } catch (IOException e) {
-                    log.error("Error during flush", e);
-                }
-            });
-        }
-
-        throttledWriteRequests.inc();
-        long absoluteTimeoutNanos = System.nanoTime() + maxThrottleTimeNanos;
-
-        while (System.nanoTime() < absoluteTimeoutNanos) {
-            long stamp = writeCacheRotationLock.readLock();
-            try {
-                if (writeCache.put(ledgerId, entryId, entry)) {
-                    // We succeeded in putting the entry in write cache in the
-                    return;
-                }
-            } finally {
-                 writeCacheRotationLock.unlockRead(stamp);
-            }
-
-            // Wait some time and try again
-            try {
-                Thread.sleep(1);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new IOException("Interrupted when adding entry " + 
ledgerId + "@" + entryId);
-            }
-        }
-
-        // Timeout expired and we weren't able to insert in write cache
-        rejectedWriteRequests.inc();
-        throw new OperationRejectedException();
+        return getLedgerSorage(ledgerId).addEntry(entry);
     }
 
     @Override
     public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
-        long startTime = MathUtils.nowInNano();
-        if (log.isDebugEnabled()) {
-            log.debug("Get Entry: {}@{}", ledgerId, entryId);
-        }
-
-        if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
-            return getLastEntry(ledgerId);
-        }
-
-        // We need to try to read from both write caches, since recent entries 
could be found in either of the two. The
-        // write caches are already thread safe on their own, here we just 
need to make sure we get references to both
-        // of them. Using an optimistic lock since the read lock is always 
free, unless we're swapping the caches.
-        long stamp = writeCacheRotationLock.tryOptimisticRead();
-        WriteCache localWriteCache = writeCache;
-        WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed;
-        if (!writeCacheRotationLock.validate(stamp)) {
-            // Fallback to regular read lock approach
-            stamp = writeCacheRotationLock.readLock();
-            try {
-                localWriteCache = writeCache;
-                localWriteCacheBeingFlushed = writeCacheBeingFlushed;
-            } finally {
-                writeCacheRotationLock.unlockRead(stamp);
-            }
-        }
-
-        // First try to read from the write cache of recent entries
-        ByteBuf entry = localWriteCache.get(ledgerId, entryId);
-        if (entry != null) {
-            recordSuccessfulEvent(readCacheHitStats, startTime);
-            recordSuccessfulEvent(readEntryStats, startTime);
-            return entry;
-        }
-
-        // If there's a flush going on, the entry might be in the flush buffer
-        entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);
-        if (entry != null) {
-            recordSuccessfulEvent(readCacheHitStats, startTime);
-            recordSuccessfulEvent(readEntryStats, startTime);
-            return entry;
-        }
-
-        // Try reading from read-ahead cache
-        entry = readCache.get(ledgerId, entryId);
-        if (entry != null) {
-            recordSuccessfulEvent(readCacheHitStats, startTime);
-            recordSuccessfulEvent(readEntryStats, startTime);
-            return entry;
-        }
-
-        // Read from main storage
-        long entryLocation;
-        try {
-            entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
-            if (entryLocation == 0) {
-                throw new NoEntryException(ledgerId, entryId);
-            }
-            entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
-        } catch (NoEntryException e) {
-            recordFailedEvent(readEntryStats, startTime);
-            throw e;
-        }
-
-        readCache.put(ledgerId, entryId, entry);
-
-        // Try to read more entries
-        long nextEntryLocation = entryLocation + 4 /* size header */ + 
entry.readableBytes();
-        fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);
-
-        recordSuccessfulEvent(readCacheMissStats, startTime);
-        recordSuccessfulEvent(readEntryStats, startTime);
-        return entry;
-    }
-
-    private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, 
long firstEntryLocation) {
-        try {
-            long firstEntryLogId = (firstEntryLocation >> 32);
-            long currentEntryLogId = firstEntryLogId;
-            long currentEntryLocation = firstEntryLocation;
-            int count = 0;
-            long size = 0;
-
-            while (count < readAheadCacheBatchSize && currentEntryLogId == 
firstEntryLogId) {
-                ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, 
-1, currentEntryLocation);
-
-                try {
-                    long currentEntryLedgerId = entry.getLong(0);
-                    long currentEntryId = entry.getLong(8);
-
-                    if (currentEntryLedgerId != orginalLedgerId) {
-                        // Found an entry belonging to a different ledger, 
stopping read-ahead
-                        entry.release();
-                        return;
-                    }
-
-                    // Insert entry in read cache
-                    readCache.put(orginalLedgerId, currentEntryId, entry);
-
-                    count++;
-                    size += entry.readableBytes();
-
-                    currentEntryLocation += 4 + entry.readableBytes();
-                    currentEntryLogId = currentEntryLocation >> 32;
-                } finally {
-                    entry.release();
-                }
-            }
-
-            readAheadBatchCountStats.registerSuccessfulValue(count);
-            readAheadBatchSizeStats.registerSuccessfulValue(size);
-        } catch (Exception e) {
-            if (log.isDebugEnabled()) {
-                log.debug("Exception during read ahead for ledger: {}: e", 
orginalLedgerId, e);
-            }
-        }
-    }
-
-    public ByteBuf getLastEntry(long ledgerId) throws IOException {
-        long startTime = MathUtils.nowInNano();
-
-        long stamp = writeCacheRotationLock.readLock();
-        try {
-            // First try to read from the write cache of recent entries
-            ByteBuf entry = writeCache.getLastEntry(ledgerId);
-            if (entry != null) {
-                if (log.isDebugEnabled()) {
-                    long foundLedgerId = entry.readLong(); // ledgedId
-                    long entryId = entry.readLong();
-                    entry.resetReaderIndex();
-                    if (log.isDebugEnabled()) {
-                        log.debug("Found last entry for ledger {} in write 
cache: {}@{}", ledgerId, foundLedgerId,
-                                entryId);
-                    }
-                }
-
-                recordSuccessfulEvent(readCacheHitStats, startTime);
-                recordSuccessfulEvent(readEntryStats, startTime);
-                return entry;
-            }
-
-            // If there's a flush going on, the entry might be in the flush 
buffer
-            entry = writeCacheBeingFlushed.getLastEntry(ledgerId);
-            if (entry != null) {
-                if (log.isDebugEnabled()) {
-                    entry.readLong(); // ledgedId
-                    long entryId = entry.readLong();
-                    entry.resetReaderIndex();
-                    if (log.isDebugEnabled()) {
-                        log.debug("Found last entry for ledger {} in write 
cache being flushed: {}", ledgerId, entryId);
-                    }
-                }
-
-                recordSuccessfulEvent(readCacheHitStats, startTime);
-                recordSuccessfulEvent(readEntryStats, startTime);
-                return entry;
-            }
-        } finally {
-            writeCacheRotationLock.unlockRead(stamp);
-        }
-
-        // Search the last entry in storage
-        long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId);
-        if (log.isDebugEnabled()) {
-            log.debug("Found last entry for ledger {} in db: {}", ledgerId, 
lastEntryId);
-        }
-
-        long entryLocation = entryLocationIndex.getLocation(ledgerId, 
lastEntryId);
-        ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId, 
entryLocation);
-
-        recordSuccessfulEvent(readCacheMissStats, startTime);
-        recordSuccessfulEvent(readEntryStats, startTime);
-        return content;
-    }
-
-    @VisibleForTesting
-    boolean isFlushRequired() {
-        long stamp = writeCacheRotationLock.readLock();
-        try {
-            return !writeCache.isEmpty();
-        } finally {
-            writeCacheRotationLock.unlockRead(stamp);
-        }
+        return getLedgerSorage(ledgerId).getEntry(ledgerId, entryId);
     }
 
     @Override
-    public void checkpoint(Checkpoint checkpoint) throws IOException {
-        Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();
-        if (lastCheckpoint.compareTo(checkpoint) > 0) {
-            return;
-        }
-
-        long startTime = MathUtils.nowInNano();
-
-        // Only a single flush operation can happen at a time
-        flushMutex.lock();
-
-        try {
-            // Swap the write cache so that writes can continue to happen 
while the flush is
-            // ongoing
-            swapWriteCache();
-
-            long sizeToFlush = writeCacheBeingFlushed.size();
-            if (log.isDebugEnabled()) {
-                log.debug("Flushing entries. count: {} -- size {} Mb", 
writeCacheBeingFlushed.count(),
-                        sizeToFlush / 1024.0 / 1024);
-            }
-
-            // Write all the pending entries into the entry logger and collect 
the offset
-            // position for each entry
-
-            Batch batch = entryLocationIndex.newBatch();
-            writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
-                try {
-                    long location = entryLogger.addEntry(ledgerId, entry, 
true);
-                    entryLocationIndex.addLocation(batch, ledgerId, entryId, 
location);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            });
-
-            entryLogger.flush();
-
-            long batchFlushStarTime = System.nanoTime();
-            batch.flush();
-            batch.close();
-            if (log.isDebugEnabled()) {
-                log.debug("DB batch flushed time : {} s",
-                        MathUtils.elapsedNanos(batchFlushStarTime) / (double) 
TimeUnit.SECONDS.toNanos(1));
-            }
-
-            ledgerIndex.flush();
-
-            cleanupExecutor.execute(() -> {
-                // There can only be one single cleanup task running because 
the cleanupExecutor
-                // is single-threaded
-                try {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Removing deleted ledgers from db indexes");
-                    }
-
-                    entryLocationIndex.removeOffsetFromDeletedLedgers();
-                    ledgerIndex.removeDeletedLedgers();
-                } catch (Throwable t) {
-                    log.warn("Failed to cleanup db indexes", t);
-                }
-            });
-
-            lastCheckpoint = thisCheckpoint;
-
-            // Discard all the entry from the write cache, since they're now 
persisted
-            writeCacheBeingFlushed.clear();
-
-            double flushTimeSeconds = MathUtils.elapsedNanos(startTime) / 
(double) TimeUnit.SECONDS.toNanos(1);
-            double flushThroughput = sizeToFlush / 1024.0 / 1024.0 / 
flushTimeSeconds;
-
-            if (log.isDebugEnabled()) {
-                log.debug("Flushing done time {} s -- Written {} MB/s", 
flushTimeSeconds, flushThroughput);
-            }
-
-            recordSuccessfulEvent(flushStats, startTime);
-            flushSizeStats.registerSuccessfulValue(sizeToFlush);
-        } catch (IOException e) {
-            // Leave IOExecption as it is
-            throw e;
-        } catch (RuntimeException e) {
-            // Wrap unchecked exceptions
-            throw new IOException(e);
-        } finally {
-            try {
-                isFlushOngoing.set(false);
-            } finally {
-                flushMutex.unlock();
-            }
-        }
-    }
-
-    /**
-     * Swap the current write cache with the replacement cache.
-     */
-    private void swapWriteCache() {
-        long stamp = writeCacheRotationLock.writeLock();
-        try {
-            // First, swap the current write-cache map with an empty one so 
that writes will
-            // go on unaffected. Only a single flush is happening at the same 
time
-            WriteCache tmp = writeCacheBeingFlushed;
-            writeCacheBeingFlushed = writeCache;
-            writeCache = tmp;
-
-            // since the cache is switched, we can allow flush to be triggered
-            hasFlushBeenTriggered.set(false);
-        } finally {
-            try {
-                isFlushOngoing.set(true);
-            } finally {
-                writeCacheRotationLock.unlockWrite(stamp);
-            }
-        }
+    public long getLastAddConfirmed(long ledgerId) throws IOException {
+        return getLedgerSorage(ledgerId).getLastAddConfirmed(ledgerId);
     }
 
     @Override
-    public void flush() throws IOException {
-        Checkpoint cp = checkpointSource.newCheckpoint();
-        checkpoint(cp);
-        checkpointSource.checkpointComplete(cp, true);
+    public boolean waitForLastAddConfirmedUpdate(long ledgerId, long 
previousLAC,
+            Watcher<LastAddConfirmedUpdateNotification> watcher) throws 
IOException {
+        return 
getLedgerSorage(ledgerId).waitForLastAddConfirmedUpdate(ledgerId, previousLAC, 
watcher);
     }
 
     @Override
-    public void deleteLedger(long ledgerId) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("Deleting ledger {}", ledgerId);
-        }
-
-        // Delete entries from this ledger that are still in the write cache
-        long stamp = writeCacheRotationLock.readLock();
-        try {
-            writeCache.deleteLedger(ledgerId);
-        } finally {
-            writeCacheRotationLock.unlockRead(stamp);
-        }
-
-        entryLocationIndex.delete(ledgerId);
-        ledgerIndex.delete(ledgerId);
-
-        for (int i = 0, size = ledgerDeletionListeners.size(); i < size; i++) {
-            LedgerDeletionListener listener = ledgerDeletionListeners.get(i);
-            listener.ledgerDeleted(ledgerId);
-        }
-
-        TransientLedgerInfo tli = transientLedgerInfoCache.remove(ledgerId);
-        if (tli != null) {
-            tli.close();
+    public void flush() throws IOException {
+        for (LedgerStorage ls : ledgerStorageList) {
+            ls.flush();
         }
     }
 
     @Override
-    public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long 
lastLedgerId) throws IOException {
-        return ledgerIndex.getActiveLedgersInRange(firstLedgerId, 
lastLedgerId);
-    }
-
-    @Override
-    public void updateEntriesLocations(Iterable<EntryLocation> locations) 
throws IOException {
-        // Trigger a flush to have all the entries being compacted in the db 
storage
-        flush();
-
-        entryLocationIndex.updateLocations(locations);
-    }
-
-    @Override
-    public EntryLogger getEntryLogger() {
-        return entryLogger;
+    public void checkpoint(Checkpoint checkpoint) throws IOException {
+        for (LedgerStorage ls : ledgerStorageList) {
+            ls.checkpoint(checkpoint);
+        }
     }
 
     @Override
-    public long getLastAddConfirmed(long ledgerId) throws IOException {
-        TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.get(ledgerId);
-        long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : 
NOT_ASSIGNED_LAC;
-        if (lac == NOT_ASSIGNED_LAC) {
-            ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
-            try {
-                bb.skipBytes(2 * Long.BYTES); // skip ledger id and entry id
-                lac = bb.readLong();
-                lac = getOrAddLedgerInfo(ledgerId).setLastAddConfirmed(lac);
-            } finally {
-                bb.release();
-            }
-        }
-        return lac;
+    public void deleteLedger(long ledgerId) throws IOException {
+        getLedgerSorage(ledgerId).deleteLedger(ledgerId);
     }
 
     @Override
-    public boolean waitForLastAddConfirmedUpdate(long ledgerId, long 
previousLAC,
-            Watcher<LastAddConfirmedUpdateNotification> watcher) throws 
IOException {
-        return 
getOrAddLedgerInfo(ledgerId).waitForLastAddConfirmedUpdate(previousLAC, 
watcher);
+    public void registerLedgerDeletionListener(LedgerDeletionListener 
listener) {
+        ledgerStorageList.forEach(ls -> 
ls.registerLedgerDeletionListener(listener));
     }
 
     @Override
     public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
-        getOrAddLedgerInfo(ledgerId).setExplicitLac(lac);
+        getLedgerSorage(ledgerId).setExplicitlac(ledgerId, lac);
     }
 
     @Override
     public ByteBuf getExplicitLac(long ledgerId) {
-        TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.get(ledgerId);
-        if (null == ledgerInfo) {
-            return null;
-        } else {
-            return ledgerInfo.getExplicitLac();
-        }
+        return getLedgerSorage(ledgerId).getExplicitLac(ledgerId);
     }
 
-    private TransientLedgerInfo getOrAddLedgerInfo(long ledgerId) {
-        TransientLedgerInfo tli = transientLedgerInfoCache.get(ledgerId);
-        if (tli != null) {
-            return tli;
-        } else {
-            TransientLedgerInfo newTli = new TransientLedgerInfo(ledgerId, 
ledgerIndex);
-            tli = transientLedgerInfoCache.putIfAbsent(ledgerId, newTli);
-            if (tli != null) {
-                newTli.close();
-                return tli;
-            } else {
-                return newTli;
-            }
-        }
+    public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] 
masterKey,
+            Iterable<SortedMap<Long, Long>> entries) throws Exception {
+        return getLedgerSorage(ledgerId).addLedgerToIndex(ledgerId, isFenced, 
masterKey, entries);
     }
 
-    private void updateCachedLacIfNeeded(long ledgerId, long lac) {
-        TransientLedgerInfo tli = transientLedgerInfoCache.get(ledgerId);
-        if (tli != null) {
-            tli.setLastAddConfirmed(lac);
-        }
+    public long getLastEntryInLedger(long ledgerId) throws IOException {
+        return 
getLedgerSorage(ledgerId).getEntryLocationIndex().getLastEntryInLedger(ledgerId);
     }
 
-    @Override
-    public void flushEntriesLocationsIndex() throws IOException {
-        // No-op. Location index is already flushed in 
updateEntriesLocations() call
+    public long getLocation(long ledgerId, long entryId) throws IOException {
+        return 
getLedgerSorage(ledgerId).getEntryLocationIndex().getLocation(ledgerId, 
entryId);
     }
 
-    /**
-     * Add an already existing ledger to the index.
-     *
-     * <p>This method is only used as a tool to help the migration from 
InterleaveLedgerStorage to DbLedgerStorage
-     *
-     * @param ledgerId
-     *            the ledger id
-     * @param entries
-     *            a map of entryId -> location
-     * @return the number of
-     */
-    public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] 
masterKey,
-            Iterable<SortedMap<Long, Long>> entries) throws Exception {
-        LedgerData ledgerData = 
LedgerData.newBuilder().setExists(true).setFenced(isFenced)
-                .setMasterKey(ByteString.copyFrom(masterKey)).build();
-        ledgerIndex.set(ledgerId, ledgerData);
-        AtomicLong numberOfEntries = new AtomicLong();
-
-        // Iterate over all the entries pages
-        Batch batch = entryLocationIndex.newBatch();
-        entries.forEach(map -> {
-            map.forEach((entryId, location) -> {
-                try {
-                    entryLocationIndex.addLocation(batch, ledgerId, entryId, 
location);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-
-                numberOfEntries.incrementAndGet();
-            });
-        });
-
-        batch.flush();
-        batch.close();
-
-        return numberOfEntries.get();
+    private SingleDirectoryDbLedgerStorage getLedgerSorage(long ledgerId) {
+        return ledgerStorageList.get(MathUtils.signSafeMod(ledgerId, 
numberOfDirs));
     }
 
-    @Override
-    public void registerLedgerDeletionListener(LedgerDeletionListener 
listener) {
-        ledgerDeletionListeners.add(listener);
+    public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long 
lastLedgerId) throws IOException {
+        List<Iterable<Long>> listIt = new ArrayList<>(numberOfDirs);
+        for (SingleDirectoryDbLedgerStorage ls : ledgerStorageList) {
+            listIt.add(ls.getActiveLedgersInRange(firstLedgerId, 
lastLedgerId));
+        }
+
+        return Iterables.concat(listIt);
     }
 
-    public EntryLocationIndex getEntryLocationIndex() {
-        return entryLocationIndex;
+    public ByteBuf getLastEntry(long ledgerId) throws IOException {
+        return getLedgerSorage(ledgerId).getLastEntry(ledgerId);
     }
 
-    private void recordSuccessfulEvent(OpStatsLogger logger, long 
startTimeNanos) {
-        logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
+    @VisibleForTesting
+    boolean isFlushRequired() {
+        return 
ledgerStorageList.stream().allMatch(SingleDirectoryDbLedgerStorage::isFlushRequired);
     }
 
-    private void recordFailedEvent(OpStatsLogger logger, long startTimeNanos) {
-        logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
+    @VisibleForTesting
+    List<SingleDirectoryDbLedgerStorage> getLedgerStorageList() {
+        return ledgerStorageList;
     }
 
     /**
@@ -1056,7 +323,10 @@ public static void readLedgerIndexEntries(long ledgerId, 
ServerConfiguration ser
 
         LedgerDirsManager ledgerDirsManager = new 
LedgerDirsManager(serverConf, serverConf.getLedgerDirs(),
                 new DiskChecker(serverConf.getDiskUsageThreshold(), 
serverConf.getDiskUsageWarnThreshold()));
-        String ledgerBasePath = 
ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+        List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs();
+
+        int dirIndex = MathUtils.signSafeMod(ledgerId, ledgerDirs.size());
+        String ledgerBasePath = ledgerDirs.get(dirIndex).toString();
 
         EntryLocationIndex entryLocationIndex = new 
EntryLocationIndex(serverConf,
                 (path, dbConfigType, conf1) -> new 
KeyValueStorageRocksDB(path, DbConfigType.Small, conf1, true),
@@ -1078,12 +348,4 @@ public static void readLedgerIndexEntries(long ledgerId, 
ServerConfiguration ser
         }
     }
 
-    /**
-     * Interface which process ledger logger.
-     */
-    public interface LedgerLoggerProcessor {
-        void process(long entryId, long entryLogId, long position);
-    }
-
-    private static final Logger log = 
LoggerFactory.getLogger(DbLedgerStorage.class);
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
new file mode 100644
index 000000000..c8e784a3f
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -0,0 +1,930 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.io.IOException;
+import java.util.SortedMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.StampedLock;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.GarbageCollectorThread;
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.StateManager;
+import 
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import org.apache.bookkeeper.common.util.Watcher;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Single directory implementation of LedgerStorage that uses RocksDB to keep 
the indexes for entries stored in
+ * EntryLogs.
+ *
+ * <p>This is meant only to be used from {@link DbLedgerStorage}.
+ */
+public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage {
+    private final EntryLogger entryLogger;
+
+    private final LedgerMetadataIndex ledgerIndex;
+    private final EntryLocationIndex entryLocationIndex;
+
+    private final ConcurrentLongHashMap<TransientLedgerInfo> 
transientLedgerInfoCache;
+
+    private final GarbageCollectorThread gcThread;
+
+    // Write cache where all new entries are inserted into
+    protected volatile WriteCache writeCache;
+
+    // Write cache that is used to swap with writeCache during flushes
+    protected volatile WriteCache writeCacheBeingFlushed;
+
+    // Cache where we insert entries for speculative reading
+    private final ReadCache readCache;
+
+    private final StampedLock writeCacheRotationLock = new StampedLock();
+
+    protected final ReentrantLock flushMutex = new ReentrantLock();
+
+    protected final AtomicBoolean hasFlushBeenTriggered = new 
AtomicBoolean(false);
+    private final AtomicBoolean isFlushOngoing = new AtomicBoolean(false);
+
+    private final ExecutorService executor = 
Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage"));
+
+    // Executor used to for db index cleanup
+    private final ScheduledExecutorService cleanupExecutor = Executors
+            .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("db-storage-cleanup"));
+
+    private final CopyOnWriteArrayList<LedgerDeletionListener> 
ledgerDeletionListeners = Lists
+            .newCopyOnWriteArrayList();
+
+    private final CheckpointSource checkpointSource;
+    private Checkpoint lastCheckpoint = Checkpoint.MIN;
+
+    private final long writeCacheMaxSize;
+    private final long readCacheMaxSize;
+    private final int readAheadCacheBatchSize;
+
+    private final long maxThrottleTimeNanos;
+
+    private final StatsLogger stats;
+
+    private final OpStatsLogger addEntryStats;
+    private final OpStatsLogger readEntryStats;
+    private final OpStatsLogger readCacheHitStats;
+    private final OpStatsLogger readCacheMissStats;
+    private final OpStatsLogger readAheadBatchCountStats;
+    private final OpStatsLogger readAheadBatchSizeStats;
+    private final OpStatsLogger flushStats;
+    private final OpStatsLogger flushSizeStats;
+
+    private final Counter throttledWriteRequests;
+    private final Counter rejectedWriteRequests;
+
+    static final String READ_AHEAD_CACHE_BATCH_SIZE = 
"dbStorage_readAheadCacheBatchSize";
+    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
+
+    private static final long DEFAULT_MAX_THROTTLE_TIME_MILLIS = 
TimeUnit.SECONDS.toMillis(10);
+
+    public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, 
LedgerManager ledgerManager,
+            LedgerDirsManager ledgerDirsManager, LedgerDirsManager 
indexDirsManager, StateManager stateManager,
+            CheckpointSource checkpointSource, Checkpointer checkpointer, 
StatsLogger statsLogger,
+            ScheduledExecutorService gcExecutor, long writeCacheSize, long 
readCacheSize) throws IOException {
+
+        checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
+                "Db implementation only allows for one storage dir");
+
+        String baseDir = 
ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+        log.info("Creating single directory db ledger storage on {}", baseDir);
+
+        this.writeCacheMaxSize = writeCacheSize;
+        this.writeCache = new WriteCache(writeCacheMaxSize / 2);
+        this.writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
+
+        this.checkpointSource = checkpointSource;
+
+        readCacheMaxSize = readCacheSize;
+        readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, 
DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
+
+        long maxThrottleTimeMillis = 
conf.getLong(DbLedgerStorage.MAX_THROTTLE_TIME_MILLIS,
+                DEFAULT_MAX_THROTTLE_TIME_MILLIS);
+        maxThrottleTimeNanos = 
TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis);
+
+        readCache = new ReadCache(readCacheMaxSize);
+
+        this.stats = statsLogger;
+
+        ledgerIndex = new LedgerMetadataIndex(conf, 
KeyValueStorageRocksDB.factory, baseDir, stats);
+        entryLocationIndex = new EntryLocationIndex(conf, 
KeyValueStorageRocksDB.factory, baseDir, stats);
+
+        transientLedgerInfoCache = new ConcurrentLongHashMap<>(16 * 1024,
+                Runtime.getRuntime().availableProcessors() * 2);
+        
cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo,
+                TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES,
+                TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, 
TimeUnit.MINUTES);
+
+        entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        gcThread = new GarbageCollectorThread(conf, ledgerManager, this, 
statsLogger);
+
+        stats.registerGauge("write-cache-size", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return writeCache.size() + writeCacheBeingFlushed.size();
+            }
+        });
+        stats.registerGauge("write-cache-count", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return writeCache.count() + writeCacheBeingFlushed.count();
+            }
+        });
+        stats.registerGauge("read-cache-size", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return readCache.size();
+            }
+        });
+        stats.registerGauge("read-cache-count", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return readCache.count();
+            }
+        });
+
+        addEntryStats = stats.getOpStatsLogger("add-entry");
+        readEntryStats = stats.getOpStatsLogger("read-entry");
+        readCacheHitStats = stats.getOpStatsLogger("read-cache-hits");
+        readCacheMissStats = stats.getOpStatsLogger("read-cache-misses");
+        readAheadBatchCountStats = 
stats.getOpStatsLogger("readahead-batch-count");
+        readAheadBatchSizeStats = 
stats.getOpStatsLogger("readahead-batch-size");
+        flushStats = stats.getOpStatsLogger("flush");
+        flushSizeStats = stats.getOpStatsLogger("flush-size");
+
+        throttledWriteRequests = stats.getCounter("throttled-write-requests");
+        rejectedWriteRequests = stats.getCounter("rejected-write-requests");
+    }
+
+    @Override
+    public void initialize(ServerConfiguration conf, LedgerManager 
ledgerManager, LedgerDirsManager ledgerDirsManager,
+            LedgerDirsManager indexDirsManager, StateManager stateManager, 
CheckpointSource checkpointSource,
+            Checkpointer checkpointer, StatsLogger statsLogger) throws 
IOException {
+        /// Initialized in constructor
+    }
+
+    /**
+     * Evict all the ledger info object that were not used recently.
+     */
+    private void cleanupStaleTransientLedgerInfo() {
+        transientLedgerInfoCache.removeIf((ledgerId, ledgerInfo) -> {
+            boolean isStale = ledgerInfo.isStale();
+            if (isStale) {
+                ledgerInfo.close();
+            }
+
+            return isStale;
+        });
+    }
+
+    @Override
+    public void start() {
+        gcThread.start();
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        try {
+            flush();
+
+            gcThread.shutdown();
+            entryLogger.shutdown();
+
+            cleanupExecutor.shutdown();
+            cleanupExecutor.awaitTermination(1, TimeUnit.SECONDS);
+
+            ledgerIndex.close();
+            entryLocationIndex.close();
+
+            writeCache.close();
+            writeCacheBeingFlushed.close();
+            readCache.close();
+            executor.shutdown();
+
+        } catch (IOException e) {
+            log.error("Error closing db storage", e);
+        }
+    }
+
+    @Override
+    public boolean ledgerExists(long ledgerId) throws IOException {
+        try {
+            LedgerData ledgerData = ledgerIndex.get(ledgerId);
+            if (log.isDebugEnabled()) {
+                log.debug("Ledger exists. ledger: {} : {}", ledgerId, 
ledgerData.getExists());
+            }
+            return ledgerData.getExists();
+        } catch (Bookie.NoLedgerException nle) {
+            // ledger does not exist
+            return false;
+        }
+    }
+
+    @Override
+    public boolean isFenced(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("isFenced. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.get(ledgerId).getFenced();
+    }
+
+    @Override
+    public boolean setFenced(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Set fenced. ledger: {}", ledgerId);
+        }
+        boolean changed = ledgerIndex.setFenced(ledgerId);
+        if (changed) {
+            // notify all the watchers if a ledger is fenced
+            TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.get(ledgerId);
+            if (null != ledgerInfo) {
+                ledgerInfo.notifyWatchers(Long.MAX_VALUE);
+            }
+        }
+        return changed;
+    }
+
+    @Override
+    public void setMasterKey(long ledgerId, byte[] masterKey) throws 
IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Set master key. ledger: {}", ledgerId);
+        }
+        ledgerIndex.setMasterKey(ledgerId, masterKey);
+    }
+
+    @Override
+    public byte[] readMasterKey(long ledgerId) throws IOException, 
BookieException {
+        if (log.isDebugEnabled()) {
+            log.debug("Read master key. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.get(ledgerId).getMasterKey().toByteArray();
+    }
+
+    @Override
+    public long addEntry(ByteBuf entry) throws IOException, BookieException {
+        long startTime = MathUtils.nowInNano();
+
+        long ledgerId = entry.getLong(entry.readerIndex());
+        long entryId = entry.getLong(entry.readerIndex() + 8);
+        long lac = entry.getLong(entry.readerIndex() + 16);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Add entry. {}@{}, lac = {}", ledgerId, entryId, lac);
+        }
+
+        // First we try to do an optimistic locking to get access to the 
current write cache.
+        // This is based on the fact that the write cache is only being 
rotated (swapped) every 1 minute. During the
+        // rest of the time, we can have multiple thread using the optimistic 
lock here without interfering.
+        long stamp = writeCacheRotationLock.tryOptimisticRead();
+        boolean inserted = false;
+
+        inserted = writeCache.put(ledgerId, entryId, entry);
+        if (!writeCacheRotationLock.validate(stamp)) {
+            // The write cache was rotated while we were inserting. We need to 
acquire the proper read lock and repeat
+            // the operation because we might have inserted in a write cache 
that was already being flushed and cleared,
+            // without being sure about this last entry being flushed or not.
+            stamp = writeCacheRotationLock.readLock();
+            try {
+                inserted = writeCache.put(ledgerId, entryId, entry);
+            } finally {
+                writeCacheRotationLock.unlockRead(stamp);
+            }
+        }
+
+        if (!inserted) {
+            triggerFlushAndAddEntry(ledgerId, entryId, entry);
+        }
+
+        // after successfully insert the entry, update LAC and notify the 
watchers
+        updateCachedLacIfNeeded(ledgerId, lac);
+
+        recordSuccessfulEvent(addEntryStats, startTime);
+        return entryId;
+    }
+
+    private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf 
entry)
+            throws IOException, BookieException {
+        // Write cache is full, we need to trigger a flush so that it gets 
rotated
+        // If the flush has already been triggered or flush has already 
switched the
+        // cache, we don't need to trigger another flush
+        if (!isFlushOngoing.get() && 
hasFlushBeenTriggered.compareAndSet(false, true)) {
+            // Trigger an early flush in background
+            log.info("Write cache is full, triggering flush");
+            executor.execute(() -> {
+                try {
+                    flush();
+                } catch (IOException e) {
+                    log.error("Error during flush", e);
+                }
+            });
+        }
+
+        throttledWriteRequests.inc();
+        long absoluteTimeoutNanos = System.nanoTime() + maxThrottleTimeNanos;
+
+        while (System.nanoTime() < absoluteTimeoutNanos) {
+            long stamp = writeCacheRotationLock.readLock();
+            try {
+                if (writeCache.put(ledgerId, entryId, entry)) {
+                    // We succeeded in putting the entry in write cache in the
+                    return;
+                }
+            } finally {
+                writeCacheRotationLock.unlockRead(stamp);
+            }
+
+            // Wait some time and try again
+            try {
+                Thread.sleep(1);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted when adding entry " + 
ledgerId + "@" + entryId);
+            }
+        }
+
+        // Timeout expired and we weren't able to insert in write cache
+        rejectedWriteRequests.inc();
+        throw new OperationRejectedException();
+    }
+
+    @Override
+    public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
+        long startTime = MathUtils.nowInNano();
+        if (log.isDebugEnabled()) {
+            log.debug("Get Entry: {}@{}", ledgerId, entryId);
+        }
+
+        if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+            return getLastEntry(ledgerId);
+        }
+
+        // We need to try to read from both write caches, since recent entries 
could be found in either of the two. The
+        // write caches are already thread safe on their own, here we just 
need to make sure we get references to both
+        // of them. Using an optimistic lock since the read lock is always 
free, unless we're swapping the caches.
+        long stamp = writeCacheRotationLock.tryOptimisticRead();
+        WriteCache localWriteCache = writeCache;
+        WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed;
+        if (!writeCacheRotationLock.validate(stamp)) {
+            // Fallback to regular read lock approach
+            stamp = writeCacheRotationLock.readLock();
+            try {
+                localWriteCache = writeCache;
+                localWriteCacheBeingFlushed = writeCacheBeingFlushed;
+            } finally {
+                writeCacheRotationLock.unlockRead(stamp);
+            }
+        }
+
+        // First try to read from the write cache of recent entries
+        ByteBuf entry = localWriteCache.get(ledgerId, entryId);
+        if (entry != null) {
+            recordSuccessfulEvent(readCacheHitStats, startTime);
+            recordSuccessfulEvent(readEntryStats, startTime);
+            return entry;
+        }
+
+        // If there's a flush going on, the entry might be in the flush buffer
+        entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);
+        if (entry != null) {
+            recordSuccessfulEvent(readCacheHitStats, startTime);
+            recordSuccessfulEvent(readEntryStats, startTime);
+            return entry;
+        }
+
+        // Try reading from read-ahead cache
+        entry = readCache.get(ledgerId, entryId);
+        if (entry != null) {
+            recordSuccessfulEvent(readCacheHitStats, startTime);
+            recordSuccessfulEvent(readEntryStats, startTime);
+            return entry;
+        }
+
+        // Read from main storage
+        long entryLocation;
+        try {
+            entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
+            if (entryLocation == 0) {
+                throw new NoEntryException(ledgerId, entryId);
+            }
+            entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
+        } catch (NoEntryException e) {
+            recordFailedEvent(readEntryStats, startTime);
+            throw e;
+        }
+
+        readCache.put(ledgerId, entryId, entry);
+
+        // Try to read more entries
+        long nextEntryLocation = entryLocation + 4 /* size header */ + 
entry.readableBytes();
+        fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);
+
+        recordSuccessfulEvent(readCacheMissStats, startTime);
+        recordSuccessfulEvent(readEntryStats, startTime);
+        return entry;
+    }
+
+    private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, 
long firstEntryLocation) {
+        try {
+            long firstEntryLogId = (firstEntryLocation >> 32);
+            long currentEntryLogId = firstEntryLogId;
+            long currentEntryLocation = firstEntryLocation;
+            int count = 0;
+            long size = 0;
+
+            while (count < readAheadCacheBatchSize && currentEntryLogId == 
firstEntryLogId) {
+                ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, 
-1, currentEntryLocation);
+
+                try {
+                    long currentEntryLedgerId = entry.getLong(0);
+                    long currentEntryId = entry.getLong(8);
+
+                    if (currentEntryLedgerId != orginalLedgerId) {
+                        // Found an entry belonging to a different ledger, 
stopping read-ahead
+                        entry.release();
+                        return;
+                    }
+
+                    // Insert entry in read cache
+                    readCache.put(orginalLedgerId, currentEntryId, entry);
+
+                    count++;
+                    size += entry.readableBytes();
+
+                    currentEntryLocation += 4 + entry.readableBytes();
+                    currentEntryLogId = currentEntryLocation >> 32;
+                } finally {
+                    entry.release();
+                }
+            }
+
+            readAheadBatchCountStats.registerSuccessfulValue(count);
+            readAheadBatchSizeStats.registerSuccessfulValue(size);
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Exception during read ahead for ledger: {}: e", 
orginalLedgerId, e);
+            }
+        }
+    }
+
+    public ByteBuf getLastEntry(long ledgerId) throws IOException {
+        long startTime = MathUtils.nowInNano();
+
+        long stamp = writeCacheRotationLock.readLock();
+        try {
+            // First try to read from the write cache of recent entries
+            ByteBuf entry = writeCache.getLastEntry(ledgerId);
+            if (entry != null) {
+                if (log.isDebugEnabled()) {
+                    long foundLedgerId = entry.readLong(); // ledgedId
+                    long entryId = entry.readLong();
+                    entry.resetReaderIndex();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Found last entry for ledger {} in write 
cache: {}@{}", ledgerId, foundLedgerId,
+                                entryId);
+                    }
+                }
+
+                recordSuccessfulEvent(readCacheHitStats, startTime);
+                recordSuccessfulEvent(readEntryStats, startTime);
+                return entry;
+            }
+
+            // If there's a flush going on, the entry might be in the flush 
buffer
+            entry = writeCacheBeingFlushed.getLastEntry(ledgerId);
+            if (entry != null) {
+                if (log.isDebugEnabled()) {
+                    entry.readLong(); // ledgedId
+                    long entryId = entry.readLong();
+                    entry.resetReaderIndex();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Found last entry for ledger {} in write 
cache being flushed: {}", ledgerId, entryId);
+                    }
+                }
+
+                recordSuccessfulEvent(readCacheHitStats, startTime);
+                recordSuccessfulEvent(readEntryStats, startTime);
+                return entry;
+            }
+        } finally {
+            writeCacheRotationLock.unlockRead(stamp);
+        }
+
+        // Search the last entry in storage
+        long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId);
+        if (log.isDebugEnabled()) {
+            log.debug("Found last entry for ledger {} in db: {}", ledgerId, 
lastEntryId);
+        }
+
+        long entryLocation = entryLocationIndex.getLocation(ledgerId, 
lastEntryId);
+        ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId, 
entryLocation);
+
+        recordSuccessfulEvent(readCacheMissStats, startTime);
+        recordSuccessfulEvent(readEntryStats, startTime);
+        return content;
+    }
+
+    @VisibleForTesting
+    boolean isFlushRequired() {
+        long stamp = writeCacheRotationLock.readLock();
+        try {
+            return !writeCache.isEmpty();
+        } finally {
+            writeCacheRotationLock.unlockRead(stamp);
+        }
+    }
+
+    @Override
+    public void checkpoint(Checkpoint checkpoint) throws IOException {
+        Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();
+        if (lastCheckpoint.compareTo(checkpoint) > 0) {
+            return;
+        }
+
+        long startTime = MathUtils.nowInNano();
+
+        // Only a single flush operation can happen at a time
+        flushMutex.lock();
+
+        try {
+            // Swap the write cache so that writes can continue to happen 
while the flush is
+            // ongoing
+            swapWriteCache();
+
+            long sizeToFlush = writeCacheBeingFlushed.size();
+            if (log.isDebugEnabled()) {
+                log.debug("Flushing entries. count: {} -- size {} Mb", 
writeCacheBeingFlushed.count(),
+                        sizeToFlush / 1024.0 / 1024);
+            }
+
+            // Write all the pending entries into the entry logger and collect 
the offset
+            // position for each entry
+
+            Batch batch = entryLocationIndex.newBatch();
+            writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
+                try {
+                    long location = entryLogger.addEntry(ledgerId, entry, 
true);
+                    entryLocationIndex.addLocation(batch, ledgerId, entryId, 
location);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            entryLogger.flush();
+
+            long batchFlushStarTime = System.nanoTime();
+            batch.flush();
+            batch.close();
+            if (log.isDebugEnabled()) {
+                log.debug("DB batch flushed time : {} s",
+                        MathUtils.elapsedNanos(batchFlushStarTime) / (double) 
TimeUnit.SECONDS.toNanos(1));
+            }
+
+            ledgerIndex.flush();
+
+            cleanupExecutor.execute(() -> {
+                // There can only be one single cleanup task running because 
the cleanupExecutor
+                // is single-threaded
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Removing deleted ledgers from db indexes");
+                    }
+
+                    entryLocationIndex.removeOffsetFromDeletedLedgers();
+                    ledgerIndex.removeDeletedLedgers();
+                } catch (Throwable t) {
+                    log.warn("Failed to cleanup db indexes", t);
+                }
+            });
+
+            lastCheckpoint = thisCheckpoint;
+
+            // Discard all the entry from the write cache, since they're now 
persisted
+            writeCacheBeingFlushed.clear();
+
+            double flushTimeSeconds = MathUtils.elapsedNanos(startTime) / 
(double) TimeUnit.SECONDS.toNanos(1);
+            double flushThroughput = sizeToFlush / 1024.0 / 1024.0 / 
flushTimeSeconds;
+
+            if (log.isDebugEnabled()) {
+                log.debug("Flushing done time {} s -- Written {} MB/s", 
flushTimeSeconds, flushThroughput);
+            }
+
+            recordSuccessfulEvent(flushStats, startTime);
+            flushSizeStats.registerSuccessfulValue(sizeToFlush);
+        } catch (IOException e) {
+            // Leave IOExecption as it is
+            throw e;
+        } catch (RuntimeException e) {
+            // Wrap unchecked exceptions
+            throw new IOException(e);
+        } finally {
+            try {
+                isFlushOngoing.set(false);
+            } finally {
+                flushMutex.unlock();
+            }
+        }
+    }
+
+    /**
+     * Swap the current write cache with the replacement cache.
+     */
+    private void swapWriteCache() {
+        long stamp = writeCacheRotationLock.writeLock();
+        try {
+            // First, swap the current write-cache map with an empty one so 
that writes will
+            // go on unaffected. Only a single flush is happening at the same 
time
+            WriteCache tmp = writeCacheBeingFlushed;
+            writeCacheBeingFlushed = writeCache;
+            writeCache = tmp;
+
+            // since the cache is switched, we can allow flush to be triggered
+            hasFlushBeenTriggered.set(false);
+        } finally {
+            try {
+                isFlushOngoing.set(true);
+            } finally {
+                writeCacheRotationLock.unlockWrite(stamp);
+            }
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        Checkpoint cp = checkpointSource.newCheckpoint();
+        checkpoint(cp);
+        checkpointSource.checkpointComplete(cp, true);
+    }
+
+    @Override
+    public void deleteLedger(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Deleting ledger {}", ledgerId);
+        }
+
+        // Delete entries from this ledger that are still in the write cache
+        long stamp = writeCacheRotationLock.readLock();
+        try {
+            writeCache.deleteLedger(ledgerId);
+        } finally {
+            writeCacheRotationLock.unlockRead(stamp);
+        }
+
+        entryLocationIndex.delete(ledgerId);
+        ledgerIndex.delete(ledgerId);
+
+        for (int i = 0, size = ledgerDeletionListeners.size(); i < size; i++) {
+            LedgerDeletionListener listener = ledgerDeletionListeners.get(i);
+            listener.ledgerDeleted(ledgerId);
+        }
+
+        TransientLedgerInfo tli = transientLedgerInfoCache.remove(ledgerId);
+        if (tli != null) {
+            tli.close();
+        }
+    }
+
+    @Override
+    public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long 
lastLedgerId) throws IOException {
+        return ledgerIndex.getActiveLedgersInRange(firstLedgerId, 
lastLedgerId);
+    }
+
+    @Override
+    public void updateEntriesLocations(Iterable<EntryLocation> locations) 
throws IOException {
+        // Trigger a flush to have all the entries being compacted in the db 
storage
+        flush();
+
+        entryLocationIndex.updateLocations(locations);
+    }
+
+    @Override
+    public EntryLogger getEntryLogger() {
+        return entryLogger;
+    }
+
+    @Override
+    public long getLastAddConfirmed(long ledgerId) throws IOException {
+        TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.get(ledgerId);
+        long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : 
TransientLedgerInfo.NOT_ASSIGNED_LAC;
+        if (lac == TransientLedgerInfo.NOT_ASSIGNED_LAC) {
+            ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+            try {
+                bb.skipBytes(2 * Long.BYTES); // skip ledger id and entry id
+                lac = bb.readLong();
+                lac = getOrAddLedgerInfo(ledgerId).setLastAddConfirmed(lac);
+            } finally {
+                bb.release();
+            }
+        }
+        return lac;
+    }
+
+    @Override
+    public boolean waitForLastAddConfirmedUpdate(long ledgerId, long 
previousLAC,
+            Watcher<LastAddConfirmedUpdateNotification> watcher) throws 
IOException {
+        return 
getOrAddLedgerInfo(ledgerId).waitForLastAddConfirmedUpdate(previousLAC, 
watcher);
+    }
+
+    @Override
+    public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
+        getOrAddLedgerInfo(ledgerId).setExplicitLac(lac);
+    }
+
+    @Override
+    public ByteBuf getExplicitLac(long ledgerId) {
+        TransientLedgerInfo ledgerInfo = 
transientLedgerInfoCache.get(ledgerId);
+        if (null == ledgerInfo) {
+            return null;
+        } else {
+            return ledgerInfo.getExplicitLac();
+        }
+    }
+
+    private TransientLedgerInfo getOrAddLedgerInfo(long ledgerId) {
+        TransientLedgerInfo tli = transientLedgerInfoCache.get(ledgerId);
+        if (tli != null) {
+            return tli;
+        } else {
+            TransientLedgerInfo newTli = new TransientLedgerInfo(ledgerId, 
ledgerIndex);
+            tli = transientLedgerInfoCache.putIfAbsent(ledgerId, newTli);
+            if (tli != null) {
+                newTli.close();
+                return tli;
+            } else {
+                return newTli;
+            }
+        }
+    }
+
+    private void updateCachedLacIfNeeded(long ledgerId, long lac) {
+        TransientLedgerInfo tli = transientLedgerInfoCache.get(ledgerId);
+        if (tli != null) {
+            tli.setLastAddConfirmed(lac);
+        }
+    }
+
+    @Override
+    public void flushEntriesLocationsIndex() throws IOException {
+        // No-op. Location index is already flushed in 
updateEntriesLocations() call
+    }
+
+    /**
+     * Add an already existing ledger to the index.
+     *
+     * <p>This method is only used as a tool to help the migration from 
InterleaveLedgerStorage to DbLedgerStorage
+     *
+     * @param ledgerId
+     *            the ledger id
+     * @param entries
+     *            a map of entryId -> location
+     * @return the number of
+     */
+    public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] 
masterKey,
+            Iterable<SortedMap<Long, Long>> entries) throws Exception {
+        LedgerData ledgerData = 
LedgerData.newBuilder().setExists(true).setFenced(isFenced)
+                .setMasterKey(ByteString.copyFrom(masterKey)).build();
+        ledgerIndex.set(ledgerId, ledgerData);
+        AtomicLong numberOfEntries = new AtomicLong();
+
+        // Iterate over all the entries pages
+        Batch batch = entryLocationIndex.newBatch();
+        entries.forEach(map -> {
+            map.forEach((entryId, location) -> {
+                try {
+                    entryLocationIndex.addLocation(batch, ledgerId, entryId, 
location);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+
+                numberOfEntries.incrementAndGet();
+            });
+        });
+
+        batch.flush();
+        batch.close();
+
+        return numberOfEntries.get();
+    }
+
+    @Override
+    public void registerLedgerDeletionListener(LedgerDeletionListener 
listener) {
+        ledgerDeletionListeners.add(listener);
+    }
+
+    public EntryLocationIndex getEntryLocationIndex() {
+        return entryLocationIndex;
+    }
+
+    private void recordSuccessfulEvent(OpStatsLogger logger, long 
startTimeNanos) {
+        logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
+    }
+
+    private void recordFailedEvent(OpStatsLogger logger, long startTimeNanos) {
+        logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
+    }
+
+    long getWriteCacheSize() {
+        return writeCache.size() + writeCacheBeingFlushed.size();
+    }
+
+    long getWriteCacheCount() {
+        return writeCache.count() + writeCacheBeingFlushed.count();
+    }
+
+    long getReadCacheSize() {
+        return readCache.size();
+    }
+
+    long getReadCacheCount() {
+        return readCache.count();
+    }
+
+    /**
+     * Interface which process ledger logger.
+     */
+    public interface LedgerLoggerProcessor {
+        void process(long entryId, long entryLogId, long position);
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(DbLedgerStorage.class);
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java
new file mode 100644
index 000000000..27d63e8db
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java
@@ -0,0 +1,156 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static 
org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
+import org.apache.bookkeeper.common.util.Watchable;
+import org.apache.bookkeeper.common.util.Watcher;
+
+/**
+ * This class borrows the logic from FileInfo.
+ *
+ * <p>This class is used for holding all the transient states for a given 
ledger.
+ */
+class TransientLedgerInfo extends 
Watchable<LastAddConfirmedUpdateNotification> implements AutoCloseable {
+
+    static final long LEDGER_INFO_CACHING_TIME_MINUTES = 10;
+
+    static final long NOT_ASSIGNED_LAC = Long.MIN_VALUE;
+
+    // lac
+    private volatile long lac = NOT_ASSIGNED_LAC;
+    // request from explicit lac requests
+    private ByteBuffer explicitLac = null;
+    // is the ledger info closed?
+    private boolean isClosed;
+
+    private final long ledgerId;
+    // reference to LedgerMetadataIndex
+    private final LedgerMetadataIndex ledgerIndex;
+
+    private long lastAccessed;
+
+    /**
+     * Construct an Watchable with zero watchers.
+     */
+    public TransientLedgerInfo(long ledgerId, LedgerMetadataIndex ledgerIndex) 
{
+        super(WATCHER_RECYCLER);
+        this.ledgerId = ledgerId;
+        this.ledgerIndex = ledgerIndex;
+        this.lastAccessed = System.currentTimeMillis();
+    }
+
+    long getLastAddConfirmed() {
+        return lac;
+    }
+
+    long setLastAddConfirmed(long lac) {
+        long lacToReturn;
+        boolean changed = false;
+        synchronized (this) {
+            if (this.lac == NOT_ASSIGNED_LAC || this.lac < lac) {
+                this.lac = lac;
+                changed = true;
+                lastAccessed = System.currentTimeMillis();
+            }
+            lacToReturn = this.lac;
+        }
+        if (changed) {
+            notifyWatchers(lacToReturn);
+        }
+        return lacToReturn;
+    }
+
+    synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
+            Watcher<LastAddConfirmedUpdateNotification> watcher) throws 
IOException {
+        lastAccessed = System.currentTimeMillis();
+        if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed || 
ledgerIndex.get(ledgerId).getFenced()) {
+            return false;
+        }
+
+        addWatcher(watcher);
+        return true;
+    }
+
+    public ByteBuf getExplicitLac() {
+        ByteBuf retLac = null;
+        synchronized (this) {
+            if (explicitLac != null) {
+                retLac = Unpooled.buffer(explicitLac.capacity());
+                explicitLac.rewind(); // copy from the beginning
+                retLac.writeBytes(explicitLac);
+                explicitLac.rewind();
+                return retLac;
+            }
+        }
+        return retLac;
+    }
+
+    public void setExplicitLac(ByteBuf lac) {
+        long explicitLacValue;
+        synchronized (this) {
+            if (explicitLac == null) {
+                explicitLac = ByteBuffer.allocate(lac.capacity());
+            }
+            lac.readBytes(explicitLac);
+            explicitLac.rewind();
+
+            // skip the ledger id
+            explicitLac.getLong();
+            explicitLacValue = explicitLac.getLong();
+            explicitLac.rewind();
+
+            lastAccessed = System.currentTimeMillis();
+        }
+        setLastAddConfirmed(explicitLacValue);
+    }
+
+    boolean isStale() {
+        return (lastAccessed + 
TimeUnit.MINUTES.toMillis(LEDGER_INFO_CACHING_TIME_MINUTES)) < System
+                .currentTimeMillis();
+    }
+
+    void notifyWatchers(long lastAddConfirmed) {
+        notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, 
lastAddConfirmed);
+    }
+
+    @Override
+    public void close() {
+        synchronized (this) {
+            if (isClosed) {
+                return;
+            }
+            isClosed = true;
+        }
+        // notify watchers
+        notifyWatchers(Long.MAX_VALUE);
+    }
+
+}
\ No newline at end of file
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index f4d67f277..67c69fe05 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -220,7 +220,8 @@ public void testBookieCompaction() throws Exception {
         storage.addEntry(entry3);
 
         // Simulate bookie compaction
-        EntryLogger entryLogger = ((DbLedgerStorage) storage).getEntryLogger();
+        SingleDirectoryDbLedgerStorage singleDirStorage = ((DbLedgerStorage) 
storage).getLedgerStorageList().get(0);
+        EntryLogger entryLogger = singleDirStorage.getEntryLogger();
         // Rewrite entry-3
         ByteBuf newEntry3 = Unpooled.buffer(1024);
         newEntry3.writeLong(4); // ledger id
@@ -229,7 +230,7 @@ public void testBookieCompaction() throws Exception {
         long location = entryLogger.addEntry(4, newEntry3, false);
 
         List<EntryLocation> locations = Lists.newArrayList(new 
EntryLocation(4, 3, location));
-        storage.updateEntriesLocations(locations);
+        singleDirStorage.updateEntriesLocations(locations);
 
         ByteBuf res = storage.getEntry(4, 3);
         System.out.println("res:       " + ByteBufUtil.hexDump(res));
@@ -238,20 +239,18 @@ public void testBookieCompaction() throws Exception {
     }
 
     @Test
-    public void doubleDirectoryError() throws Exception {
+    public void doubleDirectory() throws Exception {
         int gcWaitTime = 1000;
         ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
         conf.setGcWaitTime(gcWaitTime);
         conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
         conf.setLedgerDirNames(new String[] { "dir1", "dir2" });
 
-        try {
-            new Bookie(conf);
-            fail("Should have failed because of the 2 directories");
-        } catch (IllegalArgumentException e) {
-            // ok
-        }
+        // Should not fail
+        Bookie bookie = new Bookie(conf);
+        assertEquals(2, ((DbLedgerStorage) 
bookie.getLedgerStorage()).getLedgerStorageList().size());
 
+        bookie.shutdown();
     }
 
     @Test
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
index 4894814d7..7a45ee764 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
@@ -28,11 +28,18 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.StateManager;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,29 +55,49 @@
     private static class MockedDbLedgerStorage extends DbLedgerStorage {
 
         @Override
-        public void flush() throws IOException {
-            flushMutex.lock();
-            try {
-                // Swap the write caches and block indefinitely to simulate a 
slow disk
-                WriteCache tmp = writeCacheBeingFlushed;
-                writeCacheBeingFlushed = writeCache;
-                writeCache = tmp;
-
-                // since the cache is switched, we can allow flush to be 
triggered
-                hasFlushBeenTriggered.set(false);
-
-                // Block the flushing thread
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    return;
-                }
-            } finally {
-                flushMutex.unlock();
-            }
+        protected SingleDirectoryDbLedgerStorage 
newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
+                LedgerManager ledgerManager, LedgerDirsManager 
ledgerDirsManager, LedgerDirsManager indexDirsManager,
+                StateManager stateManager, CheckpointSource checkpointSource, 
Checkpointer checkpointer,
+                StatsLogger statsLogger, ScheduledExecutorService gcExecutor, 
long writeCacheSize, long readCacheSize)
+                throws IOException {
+            return new MockedSingleDirectoryDbLedgerStorage(conf, 
ledgerManager, ledgerDirsManager, indexDirsManager,
+                    stateManager, checkpointSource, checkpointer, statsLogger, 
gcExecutor, writeCacheSize,
+                    readCacheSize);
         }
 
+        private static class MockedSingleDirectoryDbLedgerStorage extends 
SingleDirectoryDbLedgerStorage {
+            public MockedSingleDirectoryDbLedgerStorage(ServerConfiguration 
conf, LedgerManager ledgerManager,
+                    LedgerDirsManager ledgerDirsManager, LedgerDirsManager 
indexDirsManager, StateManager stateManager,
+                    CheckpointSource checkpointSource, Checkpointer 
checkpointer, StatsLogger statsLogger,
+                    ScheduledExecutorService gcExecutor, long writeCacheSize, 
long readCacheSize) throws IOException {
+                super(conf, ledgerManager, ledgerDirsManager, 
indexDirsManager, stateManager, checkpointSource,
+                        checkpointer, statsLogger, gcExecutor, writeCacheSize, 
readCacheSize);
+            }
+
+          @Override
+          public void flush() throws IOException {
+              flushMutex.lock();
+              try {
+                  // Swap the write caches and block indefinitely to simulate 
a slow disk
+                  WriteCache tmp = writeCacheBeingFlushed;
+                  writeCacheBeingFlushed = writeCache;
+                  writeCache = tmp;
+
+                  // since the cache is switched, we can allow flush to be 
triggered
+                  hasFlushBeenTriggered.set(false);
+
+                  // Block the flushing thread
+                  try {
+                      Thread.sleep(1000);
+                  } catch (InterruptedException e) {
+                      Thread.currentThread().interrupt();
+                      return;
+                  }
+              } finally {
+                  flushMutex.unlock();
+              }
+          }
+        }
     }
 
     @Before


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to