This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 916cd66  Issue #570: EntryLogManagerForEntryLogPerLedger implementation
916cd66 is described below

commit 916cd6655a70407e8c251c74814543ba1dbcf026
Author: cguttapalem <cguttapa...@salesforce.com>
AuthorDate: Mon May 21 19:02:35 2018 -0700

    Issue #570: EntryLogManagerForEntryLogPerLedger implementation
    
    Descriptions of the changes in this PR:
    
    This is < sub-task6  > of Issue #570
    
    introducing EntryLogManagerForEntryLogPerLedger, which is the
    implementation of EntryLogManager for entrylog per ledger
    feature.
    
    Master Issue: #570
    
    Author: cguttapalem <cguttapa...@salesforce.com>
    
    Reviewers: Andrey Yegorov <None>, Sijie Guo <si...@apache.org>
    
    This closes #1391 from reddycharan/entrylogmanagerentrylogperledger, closes 
#570
---
 .../bookkeeper/bookie/EntryLogManagerBase.java     |   6 +-
 .../EntryLogManagerForEntryLogPerLedger.java       | 515 +++++++++++++
 .../org/apache/bookkeeper/bookie/EntryLogger.java  |  61 +-
 .../bookkeeper/bookie/EntryLoggerAllocator.java    |   4 +-
 .../bookkeeper/conf/ServerConfiguration.java       |  53 ++
 .../util/collections/ConcurrentLongHashMap.java    |   5 +-
 .../apache/bookkeeper/bookie/CreateNewLogTest.java | 322 +++++++-
 .../org/apache/bookkeeper/bookie/EntryLogTest.java | 835 ++++++++++++++++++++-
 .../bookie/LedgerStorageCheckpointTest.java        |  59 +-
 conf/bk_server.conf                                |  27 +-
 site/_data/config/bk_server.yaml                   |   6 +
 11 files changed, 1773 insertions(+), 120 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
index 849336f..701fb7b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
@@ -37,7 +37,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 abstract class EntryLogManagerBase implements EntryLogManager {
     volatile List<BufferedLogChannel> rotatedLogChannels;
     final EntryLoggerAllocator entryLoggerAllocator;
-    private final LedgerDirsManager ledgerDirsManager;
+    final LedgerDirsManager ledgerDirsManager;
     private final List<EntryLogger.EntryLogListener> listeners;
     /**
      * The maximum size of a entry logger file.
@@ -93,12 +93,12 @@ abstract class EntryLogManagerBase implements 
EntryLogManager {
         return logChannel.position() + size > Integer.MAX_VALUE;
     }
 
-    abstract BufferedLogChannel getCurrentLogForLedger(long ledgerId);
+    abstract BufferedLogChannel getCurrentLogForLedger(long ledgerId) throws 
IOException;
 
     abstract BufferedLogChannel getCurrentLogForLedgerForAddEntry(long 
ledgerId, int entrySize, boolean rollLog)
             throws IOException;
 
-    abstract void setCurrentLogForLedgerAndAddToRotate(long ledgerId, 
BufferedLogChannel logChannel);
+    abstract void setCurrentLogForLedgerAndAddToRotate(long ledgerId, 
BufferedLogChannel logChannel) throws IOException;
 
     /*
      * flush current logs.
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
new file mode 100644
index 0000000..3cdbb7a
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
@@ -0,0 +1,515 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
+import org.apache.commons.lang.mutable.MutableInt;
+
+@Slf4j
+class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
+
+    static class BufferedLogChannelWithDirInfo {
+        private final BufferedLogChannel logChannel;
+        volatile boolean ledgerDirFull = false;
+
+        BufferedLogChannelWithDirInfo(BufferedLogChannel logChannel) {
+            this.logChannel = logChannel;
+        }
+
+        public boolean isLedgerDirFull() {
+            return ledgerDirFull;
+        }
+
+        public void setLedgerDirFull(boolean ledgerDirFull) {
+            this.ledgerDirFull = ledgerDirFull;
+        }
+
+        public BufferedLogChannel getLogChannel() {
+            return logChannel;
+        }
+    }
+
+    static class EntryLogAndLockTuple {
+        private final Lock ledgerLock;
+        private BufferedLogChannelWithDirInfo entryLogWithDirInfo;
+
+        public EntryLogAndLockTuple() {
+            ledgerLock = new ReentrantLock();
+        }
+
+        public Lock getLedgerLock() {
+            return ledgerLock;
+        }
+
+        public BufferedLogChannelWithDirInfo getEntryLogWithDirInfo() {
+            return entryLogWithDirInfo;
+        }
+
+        public void setEntryLogWithDirInfo(BufferedLogChannelWithDirInfo 
entryLogWithDirInfo) {
+            this.entryLogWithDirInfo = entryLogWithDirInfo;
+        }
+    }
+
+    private final LoadingCache<Long, EntryLogAndLockTuple> ledgerIdEntryLogMap;
+    /*
+     * every time active logChannel is accessed from ledgerIdEntryLogMap
+     * cache, the accesstime of that entry is updated. But for certain
+     * operations we dont want to impact accessTime of the entries (like
+     * periodic flush of current active logChannels), and those operations
+     * can use this copy of references.
+     */
+    private final ConcurrentLongHashMap<BufferedLogChannelWithDirInfo> 
replicaOfCurrentLogChannels;
+    private final CacheLoader<Long, EntryLogAndLockTuple> 
entryLogAndLockTupleCacheLoader;
+    private final EntryLogger.RecentEntryLogsStatus 
recentlyCreatedEntryLogsStatus;
+    private final int entrylogMapAccessExpiryTimeInSeconds;
+    private final int maximumNumberOfActiveEntryLogs;
+
+    EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, 
LedgerDirsManager ledgerDirsManager,
+            EntryLoggerAllocator entryLoggerAllocator, 
List<EntryLogger.EntryLogListener> listeners,
+            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) 
throws IOException {
+        super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+        this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+        this.rotatedLogChannels = new 
CopyOnWriteArrayList<BufferedLogChannel>();
+        this.replicaOfCurrentLogChannels = new 
ConcurrentLongHashMap<BufferedLogChannelWithDirInfo>();
+        this.entrylogMapAccessExpiryTimeInSeconds = 
conf.getEntrylogMapAccessExpiryTimeInSeconds();
+        this.maximumNumberOfActiveEntryLogs = 
conf.getMaximumNumberOfActiveEntryLogs();
+        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+        this.entryLogAndLockTupleCacheLoader = new CacheLoader<Long, 
EntryLogAndLockTuple>() {
+            @Override
+            public EntryLogAndLockTuple load(Long key) throws Exception {
+                return new EntryLogAndLockTuple();
+            }
+        };
+        /*
+         * Currently we are relying on access time based eviction policy for
+         * removal of EntryLogAndLockTuple, so if the EntryLogAndLockTuple of
+         * the ledger is not accessed in
+         * entrylogMapAccessExpiryTimeInSeconds period, it will be removed
+         * from the cache.
+         *
+         * We are going to introduce explicit advisory writeClose call, with
+         * that explicit call EntryLogAndLockTuple of the ledger will be
+         * removed from the cache. But still timebased eviciton policy is
+         * needed because it is not guaranteed that Bookie/EntryLogger would
+         * receive successfully write close call in all the cases.
+         */
+        ledgerIdEntryLogMap = CacheBuilder.newBuilder()
+                .expireAfterAccess(entrylogMapAccessExpiryTimeInSeconds, 
TimeUnit.SECONDS)
+                .maximumSize(maximumNumberOfActiveEntryLogs)
+                .removalListener(new RemovalListener<Long, 
EntryLogAndLockTuple>() {
+                    @Override
+                    public void onRemoval(
+                            RemovalNotification<Long, EntryLogAndLockTuple> 
expiredLedgerEntryLogMapEntry) {
+                        onCacheEntryRemoval(expiredLedgerEntryLogMapEntry);
+                    }
+                }).build(entryLogAndLockTupleCacheLoader);
+    }
+
+    /*
+     * This method is called when an entry is removed from the cache. This 
could
+     * be because access time of that ledger has elapsed
+     * entrylogMapAccessExpiryTimeInSeconds period, or number of active
+     * currentlogs in the cache has reached the size of
+     * maximumNumberOfActiveEntryLogs, or if an entry is explicitly
+     * invalidated/removed. In these cases entry for that ledger is removed 
from
+     * cache. Since the entrylog of this ledger is not active anymore it has to
+     * be removed from replicaOfCurrentLogChannels and added to
+     * rotatedLogChannels.
+     *
+     * Because of performance/optimizations concerns the cleanup maintenance
+     * operations wont happen automatically, for more info on eviction cleanup
+     * maintenance tasks -
+     * https://google.github.io/guava/releases/19.0/api/docs/com/google/
+     * common/cache/CacheBuilder.html
+     *
+     */
+    private void onCacheEntryRemoval(RemovalNotification<Long, 
EntryLogAndLockTuple> removedLedgerEntryLogMapEntry) {
+        Long ledgerId = removedLedgerEntryLogMapEntry.getKey();
+        log.debug("LedgerId {} is being evicted from the cache map because of 
{}", ledgerId,
+                removedLedgerEntryLogMapEntry.getCause());
+        EntryLogAndLockTuple entryLogAndLockTuple = 
removedLedgerEntryLogMapEntry.getValue();
+        if (entryLogAndLockTuple == null) {
+            log.error("entryLogAndLockTuple is not supposed to be null in 
entry removal listener for ledger : {}",
+                    ledgerId);
+            return;
+        }
+        Lock lock = entryLogAndLockTuple.ledgerLock;
+        BufferedLogChannelWithDirInfo logChannelWithDirInfo = 
entryLogAndLockTuple.getEntryLogWithDirInfo();
+        if (logChannelWithDirInfo == null) {
+            log.error("logChannel for ledger: {} is not supposed to be null in 
entry removal listener", ledgerId);
+            return;
+        }
+        lock.lock();
+        try {
+            BufferedLogChannel logChannel = 
logChannelWithDirInfo.getLogChannel();
+            replicaOfCurrentLogChannels.remove(logChannel.getLogId());
+            rotatedLogChannels.add(logChannel);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private LedgerDirsListener getLedgerDirsListener() {
+        return new LedgerDirsListener() {
+            @Override
+            public void diskFull(File disk) {
+                Set<BufferedLogChannelWithDirInfo> 
copyOfCurrentLogsWithDirInfo = getCopyOfCurrentLogs();
+                for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : 
copyOfCurrentLogsWithDirInfo) {
+                    if 
(disk.equals(currentLogWithDirInfo.getLogChannel().getLogFile().getParentFile()))
 {
+                        currentLogWithDirInfo.setLedgerDirFull(true);
+                    }
+                }
+            }
+
+            @Override
+            public void diskWritable(File disk) {
+                Set<BufferedLogChannelWithDirInfo> 
copyOfCurrentLogsWithDirInfo = getCopyOfCurrentLogs();
+                for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : 
copyOfCurrentLogsWithDirInfo) {
+                    if 
(disk.equals(currentLogWithDirInfo.getLogChannel().getLogFile().getParentFile()))
 {
+                        currentLogWithDirInfo.setLedgerDirFull(false);
+                    }
+                }
+            }
+        };
+    }
+
+    Lock getLock(long ledgerId) throws IOException {
+        try {
+            return ledgerIdEntryLogMap.get(ledgerId).getLedgerLock();
+        } catch (Exception e) {
+            log.error("Received unexpected exception while fetching lock to 
acquire for ledger: " + ledgerId, e);
+            throw new IOException("Received unexpected exception while 
fetching lock to acquire", e);
+        }
+    }
+
+    /*
+     * sets the logChannel for the given ledgerId. It will add the new
+     * logchannel to replicaOfCurrentLogChannels, and the previous one will
+     * be removed from replicaOfCurrentLogChannels. Previous logChannel will
+     * be added to rotatedLogChannels in both the cases.
+     */
+    @Override
+    public void setCurrentLogForLedgerAndAddToRotate(long ledgerId, 
BufferedLogChannel logChannel) throws IOException {
+        Lock lock = getLock(ledgerId);
+        lock.lock();
+        try {
+            BufferedLogChannel hasToRotateLogChannel = 
getCurrentLogForLedger(ledgerId);
+            logChannel.setLedgerIdAssigned(ledgerId);
+            BufferedLogChannelWithDirInfo logChannelWithDirInfo = new 
BufferedLogChannelWithDirInfo(logChannel);
+            
ledgerIdEntryLogMap.get(ledgerId).setEntryLogWithDirInfo(logChannelWithDirInfo);
+            replicaOfCurrentLogChannels.put(logChannel.getLogId(), 
logChannelWithDirInfo);
+            if (hasToRotateLogChannel != null) {
+                
replicaOfCurrentLogChannels.remove(hasToRotateLogChannel.getLogId());
+                rotatedLogChannels.add(hasToRotateLogChannel);
+            }
+        } catch (Exception e) {
+            log.error("Received unexpected exception while fetching entry from 
map for ledger: " + ledgerId, e);
+            throw new IOException("Received unexpected exception while 
fetching entry from map", e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public BufferedLogChannel getCurrentLogForLedger(long ledgerId) throws 
IOException {
+        BufferedLogChannelWithDirInfo bufferedLogChannelWithDirInfo = 
getCurrentLogWithDirInfoForLedger(ledgerId);
+        BufferedLogChannel bufferedLogChannel = null;
+        if (bufferedLogChannelWithDirInfo != null) {
+            bufferedLogChannel = bufferedLogChannelWithDirInfo.getLogChannel();
+        }
+        return bufferedLogChannel;
+    }
+
+    public BufferedLogChannelWithDirInfo 
getCurrentLogWithDirInfoForLedger(long ledgerId) throws IOException {
+        Lock lock = getLock(ledgerId);
+        lock.lock();
+        try {
+            EntryLogAndLockTuple entryLogAndLockTuple = 
ledgerIdEntryLogMap.get(ledgerId);
+            return entryLogAndLockTuple.getEntryLogWithDirInfo();
+        } catch (Exception e) {
+            log.error("Received unexpected exception while fetching entry from 
map for ledger: " + ledgerId, e);
+            throw new IOException("Received unexpected exception while 
fetching entry from map", e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public Set<BufferedLogChannelWithDirInfo> getCopyOfCurrentLogs() {
+        return new 
HashSet<BufferedLogChannelWithDirInfo>(replicaOfCurrentLogChannels.values());
+    }
+
+    @Override
+    public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+        BufferedLogChannelWithDirInfo bufferedLogChannelWithDirInfo = 
replicaOfCurrentLogChannels.get(entryLogId);
+        BufferedLogChannel logChannel = null;
+        if (bufferedLogChannelWithDirInfo != null) {
+            logChannel = bufferedLogChannelWithDirInfo.getLogChannel();
+        }
+        return logChannel;
+    }
+
+    @Override
+    public void checkpoint() throws IOException {
+        /*
+         * In the case of entryLogPerLedgerEnabled we need to flush
+         * both rotatedlogs and currentlogs. This is needed because
+         * syncThread periodically does checkpoint and at this time
+         * all the logs should be flushed.
+         *
+         */
+        super.flush();
+    }
+
+    @Override
+    public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) 
throws IOException {
+        // do nothing
+        /*
+         * prepareSortedLedgerStorageCheckpoint is required for
+         * singleentrylog scenario, but it is not needed for
+         * entrylogperledger scenario, since entries of a ledger go
+         * to a entrylog (even during compaction) and SyncThread
+         * drives periodic checkpoint logic.
+         */
+
+    }
+
+    @Override
+    public void prepareEntryMemTableFlush() {
+        // do nothing
+    }
+
+    @Override
+    public boolean commitEntryMemTableFlush() throws IOException {
+        // lock it only if there is new data
+        // so that cache accesstime is not changed
+        Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = 
getCopyOfCurrentLogs();
+        for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : 
copyOfCurrentLogsWithDirInfo) {
+            BufferedLogChannel currentLog = 
currentLogWithDirInfo.getLogChannel();
+            if (reachEntryLogLimit(currentLog, 0L)) {
+                Long ledgerId = currentLog.getLedgerIdAssigned();
+                Lock lock = getLock(ledgerId);
+                lock.lock();
+                try {
+                    if (reachEntryLogLimit(currentLog, 0L)) {
+                        log.info("Rolling entry logger since it reached size 
limitation for ledger: {}", ledgerId);
+                        createNewLog(ledgerId);
+                    }
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }
+        /*
+         * in the case of entrylogperledger, SyncThread drives
+         * checkpoint logic for every flushInterval. So
+         * EntryMemtable doesn't need to call checkpoint in the case
+         * of entrylogperledger.
+         */
+        return false;
+    }
+
+    /*
+     * this is for testing purpose only. guava's cache doesnt cleanup
+     * completely (including calling expiry removal listener) automatically
+     * when access timeout elapses.
+     *
+     * https://google.github.io/guava/releases/19.0/api/docs/com/google/
+     * common/cache/CacheBuilder.html
+     *
+     * If expireAfterWrite or expireAfterAccess is requested entries may be
+     * evicted on each cache modification, on occasional cache accesses, or
+     * on calls to Cache.cleanUp(). Expired entries may be counted by
+     * Cache.size(), but will never be visible to read or write operations.
+     *
+     * Certain cache configurations will result in the accrual of periodic
+     * maintenance tasks which will be performed during write operations, or
+     * during occasional read operations in the absence of writes. The
+     * Cache.cleanUp() method of the returned cache will also perform
+     * maintenance, but calling it should not be necessary with a high
+     * throughput cache. Only caches built with removalListener,
+     * expireAfterWrite, expireAfterAccess, weakKeys, weakValues, or
+     * softValues perform periodic maintenance.
+     */
+    @VisibleForTesting
+    void doEntryLogMapCleanup() {
+        ledgerIdEntryLogMap.cleanUp();
+    }
+
+    @VisibleForTesting
+    ConcurrentMap<Long, EntryLogAndLockTuple> getCacheAsMap() {
+        return ledgerIdEntryLogMap.asMap();
+    }
+    /*
+     * Returns writable ledger dir with least number of current active
+     * entrylogs.
+     */
+    @Override
+    public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
+        Map<File, MutableInt> writableLedgerDirFrequency = new HashMap<File, 
MutableInt>();
+        writableLedgerDirs.stream()
+                .forEach((ledgerDir) -> 
writableLedgerDirFrequency.put(ledgerDir, new MutableInt()));
+        for (BufferedLogChannelWithDirInfo logChannelWithDirInfo : 
replicaOfCurrentLogChannels.values()) {
+            File parentDirOfCurrentLogChannel = 
logChannelWithDirInfo.getLogChannel().getLogFile().getParentFile();
+            if 
(writableLedgerDirFrequency.containsKey(parentDirOfCurrentLogChannel)) {
+                
writableLedgerDirFrequency.get(parentDirOfCurrentLogChannel).increment();
+            }
+        }
+        @SuppressWarnings("unchecked")
+        Optional<Entry<File, MutableInt>> ledgerDirWithLeastNumofCurrentLogs = 
writableLedgerDirFrequency.entrySet()
+                .stream().min(Map.Entry.comparingByValue());
+        return ledgerDirWithLeastNumofCurrentLogs.get().getKey();
+    }
+
+    @Override
+    public void close() throws IOException {
+        Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = 
getCopyOfCurrentLogs();
+        for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : 
copyOfCurrentLogsWithDirInfo) {
+            
EntryLogger.closeFileChannel(currentLogWithDirInfo.getLogChannel());
+        }
+    }
+
+    @Override
+    public void forceClose() {
+        Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = 
getCopyOfCurrentLogs();
+        for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : 
copyOfCurrentLogsWithDirInfo) {
+            
EntryLogger.forceCloseFileChannel(currentLogWithDirInfo.getLogChannel());
+        }
+    }
+
+    @Override
+    void flushCurrentLogs() throws IOException {
+        Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = 
getCopyOfCurrentLogs();
+        for (BufferedLogChannelWithDirInfo logChannelWithDirInfo : 
copyOfCurrentLogsWithDirInfo) {
+            /**
+             * flushCurrentLogs method is called during checkpoint, so metadata
+             * of the file also should be force written.
+             */
+            flushLogChannel(logChannelWithDirInfo.getLogChannel(), true);
+        }
+    }
+
+    @Override
+    public BufferedLogChannel createNewLogForCompaction() throws IOException {
+        throw new UnsupportedOperationException(
+                "When entryLogPerLedger is enabled, transactional compaction 
should have been disabled");
+    }
+
+    @Override
+    public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws 
IOException {
+        Lock lock = getLock(ledger);
+        lock.lock();
+        try {
+            return super.addEntry(ledger, entry, rollLog);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    void createNewLog(long ledgerId) throws IOException {
+        Lock lock = getLock(ledgerId);
+        lock.lock();
+        try {
+            super.createNewLog(ledgerId);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    @Override
+    BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int 
entrySize, boolean rollLog)
+            throws IOException {
+        Lock lock = getLock(ledgerId);
+        lock.lock();
+        try {
+            BufferedLogChannelWithDirInfo logChannelWithDirInfo = 
getCurrentLogWithDirInfoForLedger(ledgerId);
+            BufferedLogChannel logChannel = null;
+            if (logChannelWithDirInfo != null) {
+                logChannel = logChannelWithDirInfo.getLogChannel();
+            }
+            boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(logChannel, entrySize)
+                    : readEntryLogHardLimit(logChannel, entrySize);
+            // Create new log if logSizeLimit reached or current disk is full
+            boolean diskFull = (logChannel == null) ? false : 
logChannelWithDirInfo.isLedgerDirFull();
+            boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs();
+
+            /**
+             * if disk of the logChannel is full or if the entrylog limit is
+             * reached of if the logchannel is not initialized, then
+             * createNewLog. If allDisks are full then proceed with the current
+             * logChannel, since Bookie must have turned to readonly mode and
+             * the addEntry traffic would be from GC and it is ok to proceed in
+             * this case.
+             */
+            if ((diskFull && (!allDisksFull)) || reachEntryLogLimit || 
(logChannel == null)) {
+                if (logChannel != null) {
+                    logChannel.flushAndForceWriteIfRegularFlush(false);
+                }
+                createNewLog(ledgerId);
+            }
+
+            return getCurrentLogForLedger(ledgerId);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void flushRotatedLogs() throws IOException {
+        for (BufferedLogChannel channel : rotatedLogChannels) {
+            channel.flushAndForceWrite(true);
+            // since this channel is only used for writing, after flushing the 
channel,
+            // we had to close the underlying file channel. Otherwise, we 
might end up
+            // leaking fds which cause the disk spaces could not be reclaimed.
+            EntryLogger.closeFileChannel(channel);
+            
recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
+            rotatedLogChannels.remove(channel);
+            log.info("Synced entry logger {} to disk.", channel.getLogId());
+        }
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 75445c2..d300e0b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -45,9 +45,7 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -366,63 +364,8 @@ public class EntryLogger {
         this.entryLoggerAllocator = new EntryLoggerAllocator(conf, 
ledgerDirsManager, recentlyCreatedEntryLogsStatus,
                 logId);
         if (entryLogPerLedgerEnabled) {
-            this.entryLogManager = new EntryLogManagerForSingleEntryLog(conf, 
ledgerDirsManager, entryLoggerAllocator,
-                    listeners, recentlyCreatedEntryLogsStatus) {
-                @Override
-                public void checkpoint() throws IOException {
-                    /*
-                     * In the case of entryLogPerLedgerEnabled we need to flush
-                     * both rotatedlogs and currentlogs. This is needed because
-                     * syncThread periodically does checkpoint and at this time
-                     * all the logs should be flushed.
-                     *
-                     */
-                    super.flush();
-                }
-
-                @Override
-                public void prepareSortedLedgerStorageCheckpoint(long 
numBytesFlushed) throws IOException {
-                    // do nothing
-                    /*
-                     * prepareSortedLedgerStorageCheckpoint is required for
-                     * singleentrylog scenario, but it is not needed for
-                     * entrylogperledger scenario, since entries of a ledger go
-                     * to a entrylog (even during compaction) and SyncThread
-                     * drives periodic checkpoint logic.
-                     */
-
-                }
-
-                @Override
-                public void prepareEntryMemTableFlush() {
-                    // do nothing
-                }
-
-                @Override
-                public boolean commitEntryMemTableFlush() throws IOException {
-                    // lock it only if there is new data
-                    // so that cache accesstime is not changed
-                    Set<BufferedLogChannel> copyOfCurrentLogs = new 
HashSet<BufferedLogChannel>(
-                            
Arrays.asList(super.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
-                    for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
-                        if (reachEntryLogLimit(currentLog, 0L)) {
-                            synchronized (this) {
-                                if (reachEntryLogLimit(currentLog, 0L)) {
-                                    LOG.info("Rolling entry logger since it 
reached size limitation");
-                                    
createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
-                                }
-                            }
-                        }
-                    }
-                    /*
-                     * in the case of entrylogperledger, SyncThread drives
-                     * checkpoint logic for every flushInterval. So
-                     * EntryMemtable doesn't need to call checkpoint in the 
case
-                     * of entrylogperledger.
-                     */
-                    return false;
-                }
-            };
+            this.entryLogManager = new 
EntryLogManagerForEntryLogPerLedger(conf, ledgerDirsManager,
+                    entryLoggerAllocator, listeners, 
recentlyCreatedEntryLogsStatus);
         } else {
             this.entryLogManager = new EntryLogManagerForSingleEntryLog(conf, 
ledgerDirsManager, entryLoggerAllocator,
                     listeners, recentlyCreatedEntryLogsStatus);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
index d33d7c4..10e7715 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
@@ -51,8 +51,8 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 class EntryLoggerAllocator {
 
     private long preallocatedLogId;
-    private Future<BufferedLogChannel> preallocation = null;
-    private ExecutorService allocatorExecutor;
+    Future<BufferedLogChannel> preallocation = null;
+    ExecutorService allocatorExecutor;
     private final ServerConfiguration conf;
     private final LedgerDirsManager ledgerDirsManager;
     private final Object createEntryLogLock = new Object();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index a763ddf..f088b36 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -190,6 +190,22 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     protected static final String NUMBER_OF_MEMTABLE_FLUSH_THREADS = 
"numOfMemtableFlushThreads";
 
 
+    /*
+     * config specifying if the entrylog per ledger is enabled, then the amount
+     * of time EntryLogManagerForEntryLogPerLedger should wait for closing the
+     * entrylog file after the last addEntry call for that ledger, if explicit
+     * writeclose for that ledger is not received.
+     */
+    protected static final String ENTRYLOGMAP_ACCESS_EXPIRYTIME_INSECONDS = 
"entrylogMapAccessExpiryTimeInSeconds";
+
+    /*
+     * in entryLogPerLedger feature, this specifies the maximum number of
+     * entrylogs that can be active at a given point in time. If there are more
+     * number of active entryLogs then the maximumNumberOfActiveEntryLogs then
+     * the entrylog will be evicted from the cache.
+     */
+    protected static final String MAXIMUM_NUMBER_OF_ACTIVE_ENTRYLOGS = 
"maximumNumberOfActiveEntryLogs";
+
     /**
      * Construct a default configuration object.
      */
@@ -2772,4 +2788,41 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
         this.setProperty(NUMBER_OF_MEMTABLE_FLUSH_THREADS, 
Integer.toString(numOfMemtableFlushThreads));
         return this;
     }
+
+    /*
+     * in entryLogPerLedger feature, this specifies the time, once this 
duration
+     * has elapsed after the entry's last access, that entry should be
+     * automatically removed from the cache
+     */
+    public int getEntrylogMapAccessExpiryTimeInSeconds() {
+        return this.getInt(ENTRYLOGMAP_ACCESS_EXPIRYTIME_INSECONDS, 5 * 60);
+    }
+
+    /*
+     * sets the time duration for entrylogMapAccessExpiryTimeInSeconds, which 
will be used for cache eviction
+     * policy, in entrylogperledger feature.
+     */
+    public ServerConfiguration setEntrylogMapAccessExpiryTimeInSeconds(int 
entrylogMapAccessExpiryTimeInSeconds) {
+        this.setProperty(ENTRYLOGMAP_ACCESS_EXPIRYTIME_INSECONDS,
+                Integer.toString(entrylogMapAccessExpiryTimeInSeconds));
+        return this;
+    }
+
+    /*
+     * get the maximum number of entrylogs that can be active at a given point
+     * in time.
+     */
+    public int getMaximumNumberOfActiveEntryLogs() {
+        return this.getInt(MAXIMUM_NUMBER_OF_ACTIVE_ENTRYLOGS, 500);
+    }
+
+    /*
+     * sets the maximum number of entrylogs that can be active at a given point
+     * in time.
+     */
+    public ServerConfiguration setMaximumNumberOfActiveEntryLogs(int 
maximumNumberOfActiveEntryLogs) {
+        this.setProperty(MAXIMUM_NUMBER_OF_ACTIVE_ENTRYLOGS,
+                Integer.toString(maximumNumberOfActiveEntryLogs));
+        return this;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
index bdfaf03..dff0af4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
@@ -196,7 +196,10 @@ public class ConcurrentLongHashMap<V> {
         return keys;
     }
 
-    List<V> values() {
+    /**
+     * @return a new list of all keys (makes a copy)
+     */
+    public List<V> values() {
         List<V> values = Lists.newArrayListWithExpectedSize((int) size());
         forEach((key, value) -> values.add(value));
         return values;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index 4c4514a..611a68a 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -20,15 +20,28 @@ package org.apache.bookkeeper.bookie;
 
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.util.concurrent.MoreExecutors;
+
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
+import 
org.apache.bookkeeper.bookie.EntryLogManagerForEntryLogPerLedger.BufferedLogChannelWithDirInfo;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.lang.mutable.MutableInt;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -41,7 +54,7 @@ import org.slf4j.LoggerFactory;
  */
 public class CreateNewLogTest {
     private static final Logger LOG = LoggerFactory
-    .getLogger(CreateNewLogTest.class);
+            .getLogger(CreateNewLogTest.class);
 
     private String[] ledgerDirs;
     private int numDirs = 100;
@@ -142,6 +155,245 @@ public class CreateNewLogTest {
         assertTrue("Wrong log id", entryLogManager.getCurrentLogId() > 1);
     }
 
+    void setSameThreadExecutorForEntryLoggerAllocator(EntryLoggerAllocator 
entryLoggerAllocator) {
+        ExecutorService executorService = 
entryLoggerAllocator.allocatorExecutor;
+        executorService.shutdown();
+        entryLoggerAllocator.allocatorExecutor = 
MoreExecutors.newDirectExecutorService();
+    }
+
+    /*
+     * entryLogPerLedger is enabled and various scenarios of entrylogcreation 
are tested
+     */
+    @Test
+    public void testEntryLogPerLedgerCreationWithPreAllocation() throws 
Exception {
+        /*
+         * I wish I could shorten this testcase or split it into multiple 
testcases,
+         * but I want to cover a scenario and it requires multiple operations 
in
+         * sequence and validations along the way. Please bear with the length 
of this
+         * testcase, I added as many comments as I can to simplify it.
+         */
+
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        conf.setIsForceGCAllowWhenNoSpace(true);
+        // preAllocation is Enabled
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        conf.setEntryLogPerLedgerEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLoggerAllocator entryLoggerAllocator = 
entryLogger.entryLoggerAllocator;
+        EntryLogManagerForEntryLogPerLedger entryLogManager = 
(EntryLogManagerForEntryLogPerLedger) entryLogger
+                .getEntryLogManager();
+        // set same thread executor for entryLoggerAllocator's 
allocatorExecutor
+        setSameThreadExecutorForEntryLoggerAllocator(entryLoggerAllocator);
+
+        /*
+         * no entrylog will be created during initialization
+         */
+        int expectedPreAllocatedLogID = -1;
+        Assert.assertEquals("PreallocatedlogId after initialization of 
Entrylogger",
+                expectedPreAllocatedLogID, 
entryLoggerAllocator.getPreallocatedLogId());
+
+        int numOfLedgers = 6;
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            /* since we are starting creation of new ledgers, entrylogid will 
be ledgerid */
+            entryLogManager.createNewLog(i);
+        }
+
+        /*
+         * preallocation is enabled so though entryLogId starts with 0, 
preallocatedLogId would be equal to numOfLedgers
+         */
+        expectedPreAllocatedLogID = numOfLedgers;
+        Assert.assertEquals("PreallocatedlogId after creation of logs for 
ledgers", expectedPreAllocatedLogID,
+                entryLoggerAllocator.getPreallocatedLogId());
+        Assert.assertEquals("Number of current ", numOfLedgers,
+                entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of LogChannels to flush", 0,
+                entryLogManager.getRotatedLogChannels().size());
+
+        // create dummy entrylog file with id - (expectedPreAllocatedLogID + 1)
+        String logFileName = Long.toHexString(expectedPreAllocatedLogID + 1) + 
".log";
+        File dir = ledgerDirsManager.pickRandomWritableDir();
+        LOG.info("Picked this directory: " + dir);
+        File newLogFile = new File(dir, logFileName);
+        newLogFile.createNewFile();
+
+        /*
+         * since there is already preexisting entrylog file with id -
+         * (expectedPreAllocatedLogIDDuringInitialization + 1), when new
+         * entrylog is created it should have
+         * (expectedPreAllocatedLogIDDuringInitialization + 2) id
+         */
+        long rotatedLedger = 1L;
+        entryLogManager.createNewLog(rotatedLedger);
+
+        expectedPreAllocatedLogID = expectedPreAllocatedLogID + 2;
+        Assert.assertEquals("PreallocatedlogId ",
+                expectedPreAllocatedLogID, 
entryLoggerAllocator.getPreallocatedLogId());
+        Assert.assertEquals("Number of current ", numOfLedgers,
+                entryLogManager.getCopyOfCurrentLogs().size());
+        List<BufferedLogChannel> rotatedLogChannels = 
entryLogManager.getRotatedLogChannels();
+        Assert.assertEquals("Number of LogChannels rotated", 1, 
rotatedLogChannels.size());
+        Assert.assertEquals("Rotated logchannel logid", rotatedLedger, 
rotatedLogChannels.iterator().next().getLogId());
+        entryLogger.flush();
+        /*
+         * when flush is called all the rotatedlogchannels are flushed and
+         * removed from rotatedlogchannels list. But here since entrylogId - 0,
+         * is not yet rotated and flushed yet, getLeastUnflushedLogId will 
still
+         * return 0.
+         */
+        rotatedLogChannels = entryLogManager.getRotatedLogChannels();
+        Assert.assertEquals("Number of LogChannels rotated", 0, 
rotatedLogChannels.size());
+        Assert.assertEquals("Least UnflushedLoggerId", 0, 
entryLogger.getLeastUnflushedLogId());
+
+        entryLogManager.createNewLog(0L);
+        rotatedLogChannels = entryLogManager.getRotatedLogChannels();
+        Assert.assertEquals("Number of LogChannels rotated", 1, 
rotatedLogChannels.size());
+        Assert.assertEquals("Least UnflushedLoggerId", 0, 
entryLogger.getLeastUnflushedLogId());
+        entryLogger.flush();
+        /*
+         * since both entrylogids 0, 1 are rotated and flushed,
+         * leastunFlushedLogId should be 2
+         */
+        Assert.assertEquals("Least UnflushedLoggerId", 2, 
entryLogger.getLeastUnflushedLogId());
+        expectedPreAllocatedLogID = expectedPreAllocatedLogID + 1;
+
+        /*
+         * we should be able to get entryLogMetadata from all the active
+         * entrylogs and the logs which are moved toflush list. Since no entry
+         * is added, all the meta should be empty.
+         */
+        for (int i = 0; i <= expectedPreAllocatedLogID; i++) {
+            EntryLogMetadata meta = entryLogger.getEntryLogMetadata(i);
+            Assert.assertTrue("EntryLogMetadata should be empty", 
meta.isEmpty());
+            Assert.assertTrue("EntryLog usage should be 0", 
meta.getTotalSize() == 0);
+        }
+    }
+
+    /**
+     * In this testcase entryLogPerLedger is Enabled and entrylogs are created
+     * while ledgerdirs are getting full.
+     */
+    @Test
+    public void testEntryLogCreationWithFilledDirs() throws Exception {
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        // forceGCAllowWhenNoSpace is disabled
+        conf.setIsForceGCAllowWhenNoSpace(false);
+        // pre-allocation is not enabled
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLoggerAllocator entryLoggerAllocator = 
entryLogger.entryLoggerAllocator;
+        EntryLogManagerForEntryLogPerLedger entryLogManager = 
(EntryLogManagerForEntryLogPerLedger)
+                entryLogger.getEntryLogManager();
+        // set same thread executor for entryLoggerAllocator's 
allocatorExecutor
+        setSameThreadExecutorForEntryLoggerAllocator(entryLoggerAllocator);
+
+        int expectedPreAllocatedLogIDDuringInitialization = -1;
+        Assert.assertEquals("PreallocatedlogId after initialization of 
Entrylogger",
+                expectedPreAllocatedLogIDDuringInitialization, 
entryLoggerAllocator.getPreallocatedLogId());
+        Assert.assertEquals("Preallocation Future of this slot should be 
null", null,
+                entryLogger.entryLoggerAllocator.preallocation);
+
+        long ledgerId = 0L;
+
+        entryLogManager.createNewLog(ledgerId);
+
+        /*
+         * pre-allocation is not enabled, so it would not preallocate for next 
entrylog
+         */
+        Assert.assertEquals("PreallocatedlogId after initialization of 
Entrylogger",
+                expectedPreAllocatedLogIDDuringInitialization + 1, 
entryLoggerAllocator.getPreallocatedLogId());
+
+        for (int i = 0; i < numDirs - 1; i++) {
+            ledgerDirsManager.addToFilledDirs(Bookie.getCurrentDirectory(new 
File(ledgerDirs[i])));
+        }
+
+        /*
+         * this is the only non-filled ledgerDir so it should be used for 
creating new entryLog
+         */
+        File nonFilledLedgerDir = Bookie.getCurrentDirectory(new 
File(ledgerDirs[numDirs - 1]));
+
+        entryLogManager.createNewLog(ledgerId);
+        BufferedLogChannel newLogChannel = 
entryLogManager.getCurrentLogForLedger(ledgerId);
+        Assert.assertEquals("Directory of newly created BufferedLogChannel 
file", nonFilledLedgerDir.getAbsolutePath(),
+                newLogChannel.getLogFile().getParentFile().getAbsolutePath());
+
+        ledgerDirsManager.addToFilledDirs(Bookie.getCurrentDirectory(new 
File(ledgerDirs[numDirs - 1])));
+
+        // new entrylog creation should succeed, though there is no writable 
ledgerDir
+        entryLogManager.createNewLog(ledgerId);
+    }
+
+    /*
+     * In this testcase it is validated if the entryLog is created in the
+     * ledgerDir with least number of current active entrylogs
+     */
+    @Test
+    public void testLedgerDirsUniformityDuringCreation() throws Exception {
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        // pre-allocation is not enabled
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entrylogManager = 
(EntryLogManagerForEntryLogPerLedger)
+                entryLogger.getEntryLogManager();
+
+        for (long i = 0; i < ledgerDirs.length; i++) {
+            entrylogManager.createNewLog(i);
+        }
+
+        int numberOfLedgersCreated = ledgerDirs.length;
+
+        Assert.assertEquals("Highest frequency of entrylogs per ledgerdir", 1,
+                
highestFrequencyOfEntryLogsPerLedgerDir(entrylogManager.getCopyOfCurrentLogs()));
+
+        long newLedgerId = numberOfLedgersCreated;
+        entrylogManager.createNewLog(newLedgerId);
+        numberOfLedgersCreated++;
+
+        Assert.assertEquals("Highest frequency of entrylogs per ledgerdir", 2,
+                
highestFrequencyOfEntryLogsPerLedgerDir(entrylogManager.getCopyOfCurrentLogs()));
+
+        for (long i = numberOfLedgersCreated; i < 2 * ledgerDirs.length; i++) {
+            entrylogManager.createNewLog(i);
+        }
+
+        Assert.assertEquals("Highest frequency of entrylogs per ledgerdir", 2,
+                
highestFrequencyOfEntryLogsPerLedgerDir(entrylogManager.getCopyOfCurrentLogs()));
+    }
+
+
+    int 
highestFrequencyOfEntryLogsPerLedgerDir(Set<BufferedLogChannelWithDirInfo> 
copyOfCurrentLogsWithDirInfo) {
+        Map<File, MutableInt> frequencyOfEntryLogsInLedgerDirs = new 
HashMap<File, MutableInt>();
+        for (BufferedLogChannelWithDirInfo logChannelWithDirInfo : 
copyOfCurrentLogsWithDirInfo) {
+            File parentDir = 
logChannelWithDirInfo.getLogChannel().getLogFile().getParentFile();
+            if (frequencyOfEntryLogsInLedgerDirs.containsKey(parentDir)) {
+                frequencyOfEntryLogsInLedgerDirs.get(parentDir).increment();
+            } else {
+                frequencyOfEntryLogsInLedgerDirs.put(parentDir, new 
MutableInt(1));
+            }
+        }
+        @SuppressWarnings("unchecked")
+        int highestFreq = ((Entry<File, MutableInt>) 
(frequencyOfEntryLogsInLedgerDirs.entrySet().stream()
+                
.max(Map.Entry.comparingByValue()).get())).getValue().intValue();
+        return highestFreq;
+    }
+
     @Test
     public void 
testConcurrentCreateNewLogWithEntryLogFilePreAllocationEnabled() throws 
Exception {
         testConcurrentCreateNewLog(true);
@@ -164,6 +416,9 @@ public class CreateNewLogTest {
 
         EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
         EntryLogManagerBase entryLogManager = (EntryLogManagerBase) 
el.getEntryLogManager();
+        // set same thread executor for entryLoggerAllocator's 
allocatorExecutor
+        
setSameThreadExecutorForEntryLoggerAllocator(el.getEntryLoggerAllocator());
+
         Assert.assertEquals("previousAllocatedEntryLogId after 
initialization", -1,
                 el.getPreviousAllocatedEntryLogId());
         Assert.assertEquals("leastUnflushedLogId after initialization", 0, 
el.getLeastUnflushedLogId());
@@ -178,8 +433,6 @@ public class CreateNewLogTest {
                 receivedException.set(true);
             }
         });
-        // wait for the pre-allocation to complete
-        Thread.sleep(1000);
 
         Assert.assertFalse("There shouldn't be any exceptions while creating 
newlog", receivedException.get());
         int expectedPreviousAllocatedEntryLogId = createNewLogNumOfTimes - 1;
@@ -189,7 +442,7 @@ public class CreateNewLogTest {
 
         Assert.assertEquals(
                 "previousAllocatedEntryLogId after " + createNewLogNumOfTimes
-                        + " number of times createNewLog is called",
+                + " number of times createNewLog is called",
                 expectedPreviousAllocatedEntryLogId, 
el.getPreviousAllocatedEntryLogId());
         Assert.assertEquals("Number of RotatedLogChannels", 
createNewLogNumOfTimes - 1,
                 entryLogManager.getRotatedLogChannels().size());
@@ -246,6 +499,8 @@ public class CreateNewLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
         EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        // set same thread executor for entryLoggerAllocator's 
allocatorExecutor
+        
setSameThreadExecutorForEntryLoggerAllocator(el.getEntryLoggerAllocator());
         AtomicBoolean receivedException = new AtomicBoolean(false);
 
         IntStream.range(0, 2).parallel().forEach((i) -> {
@@ -260,12 +515,67 @@ public class CreateNewLogTest {
                 receivedException.set(true);
             }
         });
-        // wait for the pre-allocation to complete
-        Thread.sleep(1000);
 
         Assert.assertFalse("There shouldn't be any exceptions while creating 
newlog", receivedException.get());
         Assert.assertEquals(
                 "previousAllocatedEntryLogId after 2 times createNewLog is 
called", 2,
                 el.getPreviousAllocatedEntryLogId());
     }
+
+    /*
+     * In this testcase entrylogs for ledgers are tried to create concurrently.
+     */
+    @Test
+    public void testConcurrentEntryLogCreations() throws Exception {
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        // pre-allocation is enabled
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        conf.setEntryLogPerLedgerEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entrylogManager = 
(EntryLogManagerForEntryLogPerLedger)
+                entryLogger.getEntryLogManager();
+
+        int numOfLedgers = 10;
+        int numOfThreadsForSameLedger = 10;
+        AtomicInteger createdEntryLogs = new AtomicInteger(0);
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch createdLatch = new CountDownLatch(numOfLedgers * 
numOfThreadsForSameLedger);
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            for (int j = 0; j < numOfThreadsForSameLedger; j++) {
+                long ledgerId = i;
+                new Thread(() -> {
+                    try {
+                        startLatch.await();
+                        entrylogManager.createNewLog(ledgerId);
+                        createdEntryLogs.incrementAndGet();
+                    } catch (InterruptedException | IOException e) {
+                        LOG.error("Got exception while trying to createNewLog 
for Ledger: " + ledgerId, e);
+                    } finally {
+                        createdLatch.countDown();
+                    }
+                }).start();
+            }
+        }
+
+        startLatch.countDown();
+        createdLatch.await(5, TimeUnit.SECONDS);
+        Assert.assertEquals("Created EntryLogs", numOfLedgers * 
numOfThreadsForSameLedger, createdEntryLogs.get());
+        Assert.assertEquals("Active currentlogs size", numOfLedgers, 
entrylogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Rotated entrylogs size", 
(numOfThreadsForSameLedger - 1) * numOfLedgers,
+                entrylogManager.getRotatedLogChannels().size());
+        /*
+         * EntryLogFilePreAllocation is Enabled so
+         * getPreviousAllocatedEntryLogId would be (numOfLedgers *
+         * numOfThreadsForSameLedger) instead of (numOfLedgers *
+         * numOfThreadsForSameLedger - 1)
+         */
+        Assert.assertEquals("PreviousAllocatedEntryLogId", numOfLedgers * 
numOfThreadsForSameLedger,
+                entryLogger.getPreviousAllocatedEntryLogId());
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index ac64335..f5e73a1 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -26,27 +26,32 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import com.google.common.collect.Sets;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -814,7 +819,7 @@ public class EntryLogTest {
      * test for validating if the EntryLog/BufferedChannel flushes/forcewrite 
if the bytes written to it are more than
      * flushIntervalInBytes
      */
-    @Test(timeout = 60000)
+    @Test
     public void testFlushIntervalInBytes() throws Exception {
         long flushIntervalInBytes = 5000;
         ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
@@ -867,4 +872,828 @@ public class EntryLogTest {
         Assert.assertEquals("LedgerId", ledgerId, readLedgerId);
         Assert.assertEquals("EntryId", 1L, readEntryId);
     }
+
+    /*
+     * tests basic logic of EntryLogManager interface for
+     * EntryLogManagerForEntryLogPerLedger.
+     */
+    @Test
+    public void testEntryLogManagerInterfaceForEntryLogPerLedger() throws 
Exception {
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager = 
(EntryLogManagerForEntryLogPerLedger) entryLogger
+                .getEntryLogManager();
+
+        Assert.assertEquals("Number of current active EntryLogs ", 0, 
entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of Rotated Logs ", 0, 
entryLogManager.getRotatedLogChannels().size());
+
+        int numOfLedgers = 5;
+        int numOfThreadsPerLedger = 10;
+        validateLockAcquireAndRelease(numOfLedgers, numOfThreadsPerLedger, 
entryLogManager);
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            entryLogManager.setCurrentLogForLedgerAndAddToRotate(i,
+                    createDummyBufferedLogChannel(entryLogger, i, conf));
+        }
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            Assert.assertEquals("LogChannel for ledger: " + i, 
entryLogManager.getCurrentLogIfPresent(i),
+                    entryLogManager.getCurrentLogForLedger(i));
+        }
+
+        Assert.assertEquals("Number of current active EntryLogs ", 
numOfLedgers,
+                entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of Rotated Logs ", 0, 
entryLogManager.getRotatedLogChannels().size());
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            entryLogManager.setCurrentLogForLedgerAndAddToRotate(i,
+                    createDummyBufferedLogChannel(entryLogger, numOfLedgers + 
i, conf));
+        }
+
+        /*
+         * since new entryLogs are set for all the ledgers, previous entrylogs 
would be added to rotatedLogChannels
+         */
+        Assert.assertEquals("Number of current active EntryLogs ", 
numOfLedgers,
+                entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of Rotated Logs ", numOfLedgers,
+                entryLogManager.getRotatedLogChannels().size());
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            entryLogManager.setCurrentLogForLedgerAndAddToRotate(i,
+                    createDummyBufferedLogChannel(entryLogger, 2 * 
numOfLedgers + i, conf));
+        }
+
+        /*
+         * again since new entryLogs are set for all the ledgers, previous 
entrylogs would be added to
+         * rotatedLogChannels
+         */
+        Assert.assertEquals("Number of current active EntryLogs ", 
numOfLedgers,
+                entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of Rotated Logs ", 2 * numOfLedgers,
+                entryLogManager.getRotatedLogChannels().size());
+
+        for (BufferedLogChannel logChannel : 
entryLogManager.getRotatedLogChannels()) {
+            entryLogManager.getRotatedLogChannels().remove(logChannel);
+        }
+        Assert.assertEquals("Number of Rotated Logs ", 0, 
entryLogManager.getRotatedLogChannels().size());
+
+        // entrylogid is sequential
+        for (long i = 0; i < numOfLedgers; i++) {
+            assertEquals("EntryLogid for Ledger " + i, 2 * numOfLedgers + i,
+                    entryLogManager.getCurrentLogForLedger(i).getLogId());
+        }
+
+        for (long i = 2 * numOfLedgers; i < (3 * numOfLedgers); i++) {
+            assertTrue("EntryLog with logId: " + i + " should be present",
+                    entryLogManager.getCurrentLogIfPresent(i) != null);
+        }
+    }
+
+    private EntryLogger.BufferedLogChannel 
createDummyBufferedLogChannel(EntryLogger entryLogger, long logid,
+            ServerConfiguration servConf) throws IOException {
+        File tmpFile = File.createTempFile("entrylog", logid + "");
+        tmpFile.deleteOnExit();
+        FileChannel fc = FileChannel.open(tmpFile.toPath());
+        EntryLogger.BufferedLogChannel logChannel = new BufferedLogChannel(fc, 
10, 10, logid, tmpFile,
+                servConf.getFlushIntervalInBytes());
+        return logChannel;
+    }
+
+    /*
+     * validates the concurrency aspect of entryLogManager's lock
+     *
+     * Executor of fixedThreadPool of size 'numOfLedgers * 
numOfThreadsPerLedger' is created and the same number
+     * of tasks are submitted to the Executor. In each task, lock of that 
ledger is acquired and then released.
+     */
+    private void validateLockAcquireAndRelease(int numOfLedgers, int 
numOfThreadsPerLedger,
+            EntryLogManagerForEntryLogPerLedger entryLogManager) throws 
InterruptedException {
+        ExecutorService tpe = Executors.newFixedThreadPool(numOfLedgers * 
numOfThreadsPerLedger);
+        CountDownLatch latchToStart = new CountDownLatch(1);
+        CountDownLatch latchToWait = new CountDownLatch(1);
+        AtomicInteger numberOfThreadsAcquiredLock = new AtomicInteger(0);
+        AtomicBoolean irptExceptionHappened = new AtomicBoolean(false);
+        Random rand = new Random();
+
+        for (int i = 0; i < numOfLedgers * numOfThreadsPerLedger; i++) {
+            long ledgerId = i % numOfLedgers;
+            tpe.submit(() -> {
+                try {
+                    latchToStart.await();
+                    Lock lock = entryLogManager.getLock(ledgerId);
+                    lock.lock();
+                    numberOfThreadsAcquiredLock.incrementAndGet();
+                    latchToWait.await();
+                    lock.unlock();
+                } catch (InterruptedException | IOException e) {
+                    irptExceptionHappened.set(true);
+                }
+            });
+        }
+
+        assertEquals("Number Of Threads acquired Lock", 0, 
numberOfThreadsAcquiredLock.get());
+        latchToStart.countDown();
+        Thread.sleep(1000);
+        /*
+         * since there are only "numOfLedgers" ledgers, only "numOfLedgers" 
threads should have been able to acquire
+         * lock. After acquiring the lock there must be waiting on 
'latchToWait' latch
+         */
+        assertEquals("Number Of Threads acquired Lock", numOfLedgers, 
numberOfThreadsAcquiredLock.get());
+        latchToWait.countDown();
+        Thread.sleep(2000);
+        assertEquals("Number Of Threads acquired Lock", numOfLedgers * 
numOfThreadsPerLedger,
+                numberOfThreadsAcquiredLock.get());
+    }
+
+    /*
+     * test EntryLogManager.EntryLogManagerForEntryLogPerLedger removes the
+     * ledger from its cache map if entry is not added to that ledger or its
+     * corresponding state is not accessed for more than evictionPeriod
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testEntryLogManagerExpiryRemoval() throws Exception {
+        int evictionPeriod = 1;
+
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        conf.setEntrylogMapAccessExpiryTimeInSeconds(evictionPeriod);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager =
+                (EntryLogManagerForEntryLogPerLedger) 
entryLogger.getEntryLogManager();
+
+        long ledgerId = 0L;
+
+        BufferedLogChannel logChannel = 
createDummyBufferedLogChannel(entryLogger, 0, conf);
+        entryLogManager.setCurrentLogForLedgerAndAddToRotate(ledgerId, 
logChannel);
+
+        BufferedLogChannel currentLogForLedger = 
entryLogManager.getCurrentLogForLedger(ledgerId);
+        assertEquals("LogChannel for ledger " + ledgerId + " should match", 
logChannel, currentLogForLedger);
+
+        Thread.sleep(evictionPeriod * 1000 + 100);
+        entryLogManager.doEntryLogMapCleanup();
+
+        /*
+         * since for more than evictionPeriod, that ledger is not accessed and 
cache is cleaned up, mapping for that
+         * ledger should not be available anymore
+         */
+        currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId);
+        assertEquals("LogChannel for ledger " + ledgerId + " should be null", 
null, currentLogForLedger);
+        Assert.assertEquals("Number of current active EntryLogs ", 0, 
entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of rotated EntryLogs ", 1, 
entryLogManager.getRotatedLogChannels().size());
+        Assert.assertTrue("CopyOfRotatedLogChannels should contain the created 
LogChannel",
+                entryLogManager.getRotatedLogChannels().contains(logChannel));
+
+        Assert.assertTrue("since mapentry must have been evicted, it should be 
null",
+                (entryLogManager.getCacheAsMap().get(ledgerId) == null)
+                        || 
(entryLogManager.getCacheAsMap().get(ledgerId).getEntryLogWithDirInfo() == 
null));
+    }
+
+    /*
+     * tests if the maximum size of cache (maximumNumberOfActiveEntryLogs) is
+     * honored in EntryLogManagerForEntryLogPerLedger's cache eviction policy.
+     */
+    @Test
+    public void testCacheMaximumSizeEvictionPolicy() throws Exception {
+        final int cacheMaximumSize = 20;
+
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(1));
+        conf.setMaximumNumberOfActiveEntryLogs(cacheMaximumSize);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager =
+                (EntryLogManagerForEntryLogPerLedger) 
entryLogger.getEntryLogManager();
+
+        for (int i = 0; i < cacheMaximumSize + 10; i++) {
+            entryLogManager.createNewLog(i);
+            int cacheSize = entryLogManager.getCacheAsMap().size();
+            Assert.assertTrue("Cache maximum size is expected to be less than 
" + cacheMaximumSize
+                    + " but current cacheSize is " + cacheSize, cacheSize <= 
cacheMaximumSize);
+        }
+    }
+
+    /**
+     * test EntryLogManager.EntryLogManagerForEntryLogPerLedger doesn't removes
+     * the ledger from its cache map if ledger's corresponding state is 
accessed
+     * within the evictionPeriod.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testExpiryRemovalByAccessingOnAnotherThread() throws Exception 
{
+        int evictionPeriod = 1;
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        conf.setEntrylogMapAccessExpiryTimeInSeconds(evictionPeriod);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager =
+                (EntryLogManagerForEntryLogPerLedger) 
entryLogger.getEntryLogManager();
+
+        long ledgerId = 0L;
+
+        BufferedLogChannel newLogChannel = 
createDummyBufferedLogChannel(entryLogger, 1, conf);
+        entryLogManager.setCurrentLogForLedgerAndAddToRotate(ledgerId, 
newLogChannel);
+
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    Thread.sleep((evictionPeriod * 1000) / 2);
+                    entryLogManager.getCurrentLogForLedger(ledgerId);
+                } catch (InterruptedException | IOException e) {
+                }
+            }
+        };
+
+        t.start();
+        Thread.sleep(evictionPeriod * 1000 + 100);
+        entryLogManager.doEntryLogMapCleanup();
+
+        /*
+         * in this scenario, that ledger is accessed by other thread during
+         * eviction period time, so it should not be evicted.
+         */
+        BufferedLogChannel currentLogForLedger = 
entryLogManager.getCurrentLogForLedger(ledgerId);
+        assertEquals("LogChannel for ledger " + ledgerId, newLogChannel, 
currentLogForLedger);
+        Assert.assertEquals("Number of current active EntryLogs ", 1, 
entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of rotated EntryLogs ", 0, 
entryLogManager.getRotatedLogChannels().size());
+    }
+
+    /**
+     * test EntryLogManager.EntryLogManagerForEntryLogPerLedger removes the
+     * ledger from its cache map if entry is not added to that ledger or its
+     * corresponding state is not accessed for more than evictionPeriod. In 
this
+     * testcase we try to call unrelated methods or access state of other
+     * ledgers within the eviction period.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testExpiryRemovalByAccessingNonCacheRelatedMethods() throws 
Exception {
+        int evictionPeriod = 1;
+
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        conf.setEntrylogMapAccessExpiryTimeInSeconds(evictionPeriod);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager =
+                (EntryLogManagerForEntryLogPerLedger) 
entryLogger.getEntryLogManager();
+
+        long ledgerId = 0L;
+
+        BufferedLogChannel newLogChannel = 
createDummyBufferedLogChannel(entryLogger, 1, conf);
+        entryLogManager.setCurrentLogForLedgerAndAddToRotate(ledgerId, 
newLogChannel);
+
+        AtomicBoolean exceptionOccured = new AtomicBoolean(false);
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    Thread.sleep(500);
+                    /*
+                     * any of the following operations should not access entry
+                     * of 'ledgerId' in the cache
+                     */
+                    entryLogManager.getCopyOfCurrentLogs();
+                    entryLogManager.getRotatedLogChannels();
+                    
entryLogManager.getCurrentLogIfPresent(newLogChannel.getLogId());
+                    
entryLogManager.getDirForNextEntryLog(ledgerDirsManager.getWritableLedgerDirs());
+                    long newLedgerId = 100;
+                    BufferedLogChannel logChannelForNewLedger =
+                            createDummyBufferedLogChannel(entryLogger, 
newLedgerId, conf);
+                    
entryLogManager.setCurrentLogForLedgerAndAddToRotate(newLedgerId, 
logChannelForNewLedger);
+                    entryLogManager.getCurrentLogIfPresent(newLedgerId);
+                } catch (Exception e) {
+                    LOG.error("Got Exception in thread", e);
+                    exceptionOccured.set(true);
+                }
+            }
+        };
+
+        t.start();
+        Thread.sleep(evictionPeriod * 1000 + 100);
+        entryLogManager.doEntryLogMapCleanup();
+        Assert.assertFalse("Exception occured in thread, which is not 
expected", exceptionOccured.get());
+
+        /*
+         * since for more than evictionPeriod, that ledger is not accessed and 
cache is cleaned up, mapping for that
+         * ledger should not be available anymore
+         */
+        BufferedLogChannel currentLogForLedger = 
entryLogManager.getCurrentLogForLedger(ledgerId);
+        assertEquals("LogChannel for ledger " + ledgerId + " should be null", 
null, currentLogForLedger);
+        // expected number of current active entryLogs is 1 since we created 
entrylog for 'newLedgerId'
+        Assert.assertEquals("Number of current active EntryLogs ", 1, 
entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of rotated EntryLogs ", 1, 
entryLogManager.getRotatedLogChannels().size());
+        Assert.assertTrue("CopyOfRotatedLogChannels should contain the created 
LogChannel",
+                
entryLogManager.getRotatedLogChannels().contains(newLogChannel));
+
+        Assert.assertTrue("since mapentry must have been evicted, it should be 
null",
+                (entryLogManager.getCacheAsMap().get(ledgerId) == null)
+                        || 
(entryLogManager.getCacheAsMap().get(ledgerId).getEntryLogWithDirInfo() == 
null));
+    }
+
+    /*
+     * testing EntryLogger functionality (addEntry/createNewLog/flush) and 
EntryLogManager with entryLogPerLedger
+     * enabled
+     */
+    @Test
+    public void testEntryLogManagerForEntryLogPerLedger() throws Exception {
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setFlushIntervalInBytes(10000000);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) 
entryLogger.getEntryLogManager();
+        Assert.assertEquals("EntryLogManager class type", 
EntryLogManagerForEntryLogPerLedger.class,
+                entryLogManager.getClass());
+
+        int numOfActiveLedgers = 20;
+        int numEntries = 5;
+
+        for (int j = 0; j < numEntries; j++) {
+            for (long i = 0; i < numOfActiveLedgers; i++) {
+                entryLogger.addEntry(i, generateEntry(i, j));
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            BufferedLogChannel logChannel =  
entryLogManager.getCurrentLogForLedger(i);
+            Assert.assertTrue("unpersistedBytes should be greater than 
LOGFILE_HEADER_SIZE",
+                    logChannel.getUnpersistedBytes() > 
EntryLogger.LOGFILE_HEADER_SIZE);
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            entryLogManager.createNewLog(i);
+        }
+
+        /*
+         * since we created new entrylog for all the activeLedgers, entrylogs 
of all the ledgers
+         * should be rotated and hence the size of copyOfRotatedLogChannels 
should be numOfActiveLedgers
+         */
+        List<BufferedLogChannel> rotatedLogs = 
entryLogManager.getRotatedLogChannels();
+        Assert.assertEquals("Number of rotated entrylogs", numOfActiveLedgers, 
rotatedLogs.size());
+
+        /*
+         * Since newlog is created for all slots, so they are moved to rotated 
logs and hence unpersistedBytes of all
+         * the slots should be just EntryLogger.LOGFILE_HEADER_SIZE
+         *
+         */
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            BufferedLogChannel logChannel = 
entryLogManager.getCurrentLogForLedger(i);
+            Assert.assertEquals("unpersistedBytes should be 
LOGFILE_HEADER_SIZE", EntryLogger.LOGFILE_HEADER_SIZE,
+                    logChannel.getUnpersistedBytes());
+        }
+
+        for (int j = numEntries; j < 2 * numEntries; j++) {
+            for (long i = 0; i < numOfActiveLedgers; i++) {
+                entryLogger.addEntry(i, generateEntry(i, j));
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            BufferedLogChannel logChannel =  
entryLogManager.getCurrentLogForLedger(i);
+            Assert.assertTrue("unpersistedBytes should be greater than 
LOGFILE_HEADER_SIZE",
+                    logChannel.getUnpersistedBytes() > 
EntryLogger.LOGFILE_HEADER_SIZE);
+        }
+
+        Assert.assertEquals("LeastUnflushedloggerID", 0, 
entryLogger.getLeastUnflushedLogId());
+
+        /*
+         * here flush is called so all the rotatedLogChannels should be file 
closed and there shouldn't be any
+         * rotatedlogchannel and also leastUnflushedLogId should be advanced 
to numOfActiveLedgers
+         */
+        entryLogger.flush();
+        Assert.assertEquals("Number of rotated entrylogs", 0, 
entryLogManager.getRotatedLogChannels().size());
+        Assert.assertEquals("LeastUnflushedloggerID", numOfActiveLedgers, 
entryLogger.getLeastUnflushedLogId());
+
+        /*
+         * after flush (flushCurrentLogs) unpersistedBytes should be 0.
+         */
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            BufferedLogChannel logChannel =  
entryLogManager.getCurrentLogForLedger(i);
+            Assert.assertEquals("unpersistedBytes should be 0", 0L, 
logChannel.getUnpersistedBytes());
+        }
+    }
+
+    /*
+     * with entryLogPerLedger enabled, create multiple entrylogs, add entries 
of ledgers and read them before and after
+     * flush
+     */
+    @Test
+    public void testReadAddCallsOfMultipleEntryLogs() throws Exception {
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        // pre allocation enabled
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) 
entryLogger.getEntryLogManager());
+
+        int numOfActiveLedgers = 10;
+        int numEntries = 10;
+        long[][] positions = new long[numOfActiveLedgers][];
+        for (int i = 0; i < numOfActiveLedgers; i++) {
+            positions[i] = new long[numEntries];
+        }
+
+        /*
+         * addentries to the ledgers
+         */
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                positions[i][j] = entryLogger.addEntry((long) i, 
generateEntry(i, j));
+                long entryLogId = (positions[i][j] >> 32L);
+                /**
+                 *
+                 * Though EntryLogFilePreAllocation is enabled, Since things 
are not done concurrently here,
+                 * entryLogIds will be sequential.
+                 */
+                Assert.assertEquals("EntryLogId for ledger: " + i, i, 
entryLogId);
+            }
+        }
+
+        /*
+         * read the entries which are written
+         */
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                String expectedValue = "ledger-" + i + "-" + j;
+                ByteBuf buf = entryLogger.readEntry(i, j, positions[i][j]);
+                long ledgerId = buf.readLong();
+                long entryId = buf.readLong();
+                byte[] data = new byte[buf.readableBytes()];
+                buf.readBytes(data);
+                assertEquals("LedgerId ", i, ledgerId);
+                assertEquals("EntryId ", j, entryId);
+                assertEquals("Entry Data ", expectedValue, new String(data));
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            entryLogManagerBase.createNewLog(i);
+        }
+
+        entryLogManagerBase.flushRotatedLogs();
+
+        // reading after flush of rotatedlogs
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                String expectedValue = "ledger-" + i + "-" + j;
+                ByteBuf buf = entryLogger.readEntry(i, j, positions[i][j]);
+                long ledgerId = buf.readLong();
+                long entryId = buf.readLong();
+                byte[] data = new byte[buf.readableBytes()];
+                buf.readBytes(data);
+                assertEquals("LedgerId ", i, ledgerId);
+                assertEquals("EntryId ", j, entryId);
+                assertEquals("Entry Data ", expectedValue, new String(data));
+            }
+        }
+    }
+
+    class ReadTask implements Callable<Boolean> {
+        long ledgerId;
+        int entryId;
+        long position;
+        EntryLogger entryLogger;
+
+        ReadTask(long ledgerId, int entryId, long position, EntryLogger 
entryLogger) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.position = position;
+            this.entryLogger = entryLogger;
+        }
+
+        @Override
+        public Boolean call() throws IOException {
+            try {
+                ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+                ByteBuf actualByteBuf = entryLogger.readEntry(ledgerId, 
entryId, position);
+                if (!expectedByteBuf.equals(actualByteBuf)) {
+                    LOG.error("Expected Entry: {} Actual Entry: {}", 
expectedByteBuf.toString(Charset.defaultCharset()),
+                            actualByteBuf.toString(Charset.defaultCharset()));
+                    throw new IOException("Expected Entry: " + 
expectedByteBuf.toString(Charset.defaultCharset())
+                            + " Actual Entry: " + 
actualByteBuf.toString(Charset.defaultCharset()));
+                }
+            } catch (IOException e) {
+                LOG.error("Got Exception for GetEntry call. LedgerId: " + 
ledgerId + " entryId: " + entryId, e);
+                throw new IOException("Got Exception for GetEntry call. 
LedgerId: " + ledgerId + " entryId: " + entryId,
+                        e);
+            }
+            return true;
+        }
+    }
+
+    /*
+     * test concurrent read operations of entries from flushed rotatedlogs 
with entryLogPerLedgerEnabled
+     */
+    @Test
+    public void testConcurrentReadCallsAfterEntryLogsAreRotated() throws 
Exception {
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setFlushIntervalInBytes(1000 * 25);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(3));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        int numOfActiveLedgers = 15;
+        int numEntries = 2000;
+        final AtomicLongArray positions = new 
AtomicLongArray(numOfActiveLedgers * numEntries);
+        EntryLogManagerForEntryLogPerLedger entryLogManager = 
(EntryLogManagerForEntryLogPerLedger)
+                entryLogger.getEntryLogManager();
+
+        for (int i = 0; i < numOfActiveLedgers; i++) {
+            for (int j = 0; j < numEntries; j++) {
+                positions.set(i * numEntries + j, entryLogger.addEntry((long) 
i, generateEntry(i, j)));
+                long entryLogId = (positions.get(i * numEntries + j) >> 32L);
+                /**
+                 *
+                 * Though EntryLogFilePreAllocation is enabled, Since things 
are not done concurrently here, entryLogIds
+                 * will be sequential.
+                 */
+                Assert.assertEquals("EntryLogId for ledger: " + i, i, 
entryLogId);
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            entryLogManager.createNewLog(i);
+        }
+        entryLogManager.flushRotatedLogs();
+
+        // reading after flush of rotatedlogs
+        ArrayList<ReadTask> readTasks = new ArrayList<ReadTask>();
+        for (int i = 0; i < numOfActiveLedgers; i++) {
+            for (int j = 0; j < numEntries; j++) {
+                readTasks.add(new ReadTask(i, j, positions.get(i * numEntries 
+ j), entryLogger));
+            }
+        }
+
+        ExecutorService executor = Executors.newFixedThreadPool(40);
+        executor.invokeAll(readTasks).forEach((future) -> {
+            try {
+                future.get();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                LOG.error("Read/Flush task failed because of 
InterruptedException", ie);
+                Assert.fail("Read/Flush task interrupted");
+            } catch (Exception ex) {
+                LOG.error("Read/Flush task failed because of  exception", ex);
+                Assert.fail("Read/Flush task failed " + ex.getMessage());
+            }
+        });
+    }
+
+    /**
+     * testcase to validate when ledgerdirs become full and eventually all
+     * ledgerdirs become full. Later a ledgerdir becomes writable.
+     */
+    @Test
+    public void testEntryLoggerAddEntryWhenLedgerDirsAreFull() throws 
Exception {
+        int numberOfLedgerDirs = 3;
+        List<File> ledgerDirs = new ArrayList<File>();
+        String[] ledgerDirsPath = new String[numberOfLedgerDirs];
+        List<File> curDirs = new ArrayList<File>();
+
+        File ledgerDir;
+        File curDir;
+        for (int i = 0; i < numberOfLedgerDirs; i++) {
+            ledgerDir = createTempDir("bkTest", ".dir").getAbsoluteFile();
+            curDir = Bookie.getCurrentDirectory(ledgerDir);
+            Bookie.checkDirectoryStructure(curDir);
+            ledgerDirs.add(ledgerDir);
+            ledgerDirsPath[i] = ledgerDir.getPath();
+            curDirs.add(curDir);
+        }
+
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        // pre-allocation is disabled
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(ledgerDirsPath);
+
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager = 
(EntryLogManagerForEntryLogPerLedger)
+                entryLogger.getEntryLogManager();
+        Assert.assertEquals("EntryLogManager class type", 
EntryLogManagerForEntryLogPerLedger.class,
+                entryLogManager.getClass());
+
+        entryLogger.addEntry(0L, generateEntry(0, 1));
+        entryLogger.addEntry(1L, generateEntry(1, 1));
+        entryLogger.addEntry(2L, generateEntry(2, 1));
+
+        File ledgerDirForLedger0 = 
entryLogManager.getCurrentLogForLedger(0L).getLogFile().getParentFile();
+        File ledgerDirForLedger1 = 
entryLogManager.getCurrentLogForLedger(1L).getLogFile().getParentFile();
+        File ledgerDirForLedger2 = 
entryLogManager.getCurrentLogForLedger(2L).getLogFile().getParentFile();
+
+        Set<File> ledgerDirsSet = new HashSet<File>();
+        ledgerDirsSet.add(ledgerDirForLedger0);
+        ledgerDirsSet.add(ledgerDirForLedger1);
+        ledgerDirsSet.add(ledgerDirForLedger2);
+
+        /*
+         * since there are 3 ledgerdirs, entrylogs for all the 3 ledgers 
should be in different ledgerdirs.
+         */
+        Assert.assertEquals("Current active LedgerDirs size", 3, 
ledgerDirs.size());
+        Assert.assertEquals("Number of rotated logchannels", 0, 
entryLogManager.getRotatedLogChannels().size());
+
+        /*
+         * ledgerDirForLedger0 is added to filledDirs, for ledger0 new 
entrylog should not be created in
+         * ledgerDirForLedger0
+         */
+        ledgerDirsManager.addToFilledDirs(ledgerDirForLedger0);
+        addEntryAndValidateFolders(entryLogger, entryLogManager, 2, 
ledgerDirForLedger0, false, ledgerDirForLedger1,
+                ledgerDirForLedger2);
+        Assert.assertEquals("Number of rotated logchannels", 1, 
entryLogManager.getRotatedLogChannels().size());
+
+        /*
+         * ledgerDirForLedger1 is also added to filledDirs, so for all the 
ledgers new entryLogs should be in
+         * ledgerDirForLedger2
+         */
+        ledgerDirsManager.addToFilledDirs(ledgerDirForLedger1);
+        addEntryAndValidateFolders(entryLogger, entryLogManager, 3, 
ledgerDirForLedger2, true, ledgerDirForLedger2,
+                ledgerDirForLedger2);
+        Assert.assertTrue("Number of rotated logchannels", (2 <= 
entryLogManager.getRotatedLogChannels().size())
+                && (entryLogManager.getRotatedLogChannels().size() <= 3));
+        int numOfRotatedLogChannels = 
entryLogManager.getRotatedLogChannels().size();
+
+        /*
+         * since ledgerDirForLedger2 is added to filleddirs, all the dirs are 
full. If all the dirs are full then it
+         * will continue to use current entrylogs for new entries instead of 
creating new one. So for all the ledgers
+         * ledgerdirs should be same as before - ledgerDirForLedger2
+         */
+        ledgerDirsManager.addToFilledDirs(ledgerDirForLedger2);
+        addEntryAndValidateFolders(entryLogger, entryLogManager, 4, 
ledgerDirForLedger2, true, ledgerDirForLedger2,
+                ledgerDirForLedger2);
+        Assert.assertEquals("Number of rotated logchannels", 
numOfRotatedLogChannels,
+                entryLogManager.getRotatedLogChannels().size());
+
+        /*
+         *  ledgerDirForLedger1 is added back to writableDirs, so new entrylog 
for all the ledgers should be created in
+         *  ledgerDirForLedger1
+         */
+        ledgerDirsManager.addToWritableDirs(ledgerDirForLedger1, true);
+        addEntryAndValidateFolders(entryLogger, entryLogManager, 4, 
ledgerDirForLedger1, true, ledgerDirForLedger1,
+                ledgerDirForLedger1);
+        Assert.assertEquals("Number of rotated logchannels", 
numOfRotatedLogChannels + 3,
+                entryLogManager.getRotatedLogChannels().size());
+    }
+
+    /*
+     * in this method we add an entry and validate the ledgerdir of the
+     * currentLogForLedger against the provided expected ledgerDirs.
+     */
+    void addEntryAndValidateFolders(EntryLogger entryLogger, 
EntryLogManagerBase entryLogManager, int entryId,
+            File expectedDirForLedger0, boolean equalsForLedger0, File 
expectedDirForLedger1,
+            File expectedDirForLedger2) throws IOException {
+        entryLogger.addEntry(0L, generateEntry(0, entryId));
+        entryLogger.addEntry(1L, generateEntry(1, entryId));
+        entryLogger.addEntry(2L, generateEntry(2, entryId));
+
+        if (equalsForLedger0) {
+            Assert.assertEquals("LedgerDir for ledger 0 after adding entry " + 
entryId, expectedDirForLedger0,
+                    
entryLogManager.getCurrentLogForLedger(0L).getLogFile().getParentFile());
+        } else {
+            Assert.assertNotEquals("LedgerDir for ledger 0 after adding entry 
" + entryId, expectedDirForLedger0,
+                    
entryLogManager.getCurrentLogForLedger(0L).getLogFile().getParentFile());
+        }
+        Assert.assertEquals("LedgerDir for ledger 1 after adding entry " + 
entryId, expectedDirForLedger1,
+                
entryLogManager.getCurrentLogForLedger(1L).getLogFile().getParentFile());
+        Assert.assertEquals("LedgerDir for ledger 2 after adding entry " + 
entryId, expectedDirForLedger2,
+                
entryLogManager.getCurrentLogForLedger(2L).getLogFile().getParentFile());
+    }
+
+    /*
+     * entries added using entrylogger with entryLogPerLedger enabled and the 
same entries are read using entrylogger
+     * with entryLogPerLedger disabled
+     */
+    @Test
+    public void testSwappingEntryLogManagerFromEntryLogPerLedgerToSingle() 
throws Exception {
+        testSwappingEntryLogManager(true, false);
+    }
+
+    /*
+     * entries added using entrylogger with entryLogPerLedger disabled and the 
same entries are read using entrylogger
+     * with entryLogPerLedger enabled
+     */
+    @Test
+    public void testSwappingEntryLogManagerFromSingleToEntryLogPerLedger() 
throws Exception {
+        testSwappingEntryLogManager(false, true);
+    }
+
+    public void testSwappingEntryLogManager(boolean 
initialEntryLogPerLedgerEnabled,
+            boolean laterEntryLogPerLedgerEnabled) throws Exception {
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(initialEntryLogPerLedgerEnabled);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        // pre allocation enabled
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) 
entryLogger.getEntryLogManager();
+        Assert.assertEquals(
+                "EntryLogManager class type", initialEntryLogPerLedgerEnabled
+                        ? EntryLogManagerForEntryLogPerLedger.class : 
EntryLogManagerForSingleEntryLog.class,
+                entryLogManager.getClass());
+
+        int numOfActiveLedgers = 10;
+        int numEntries = 10;
+        long[][] positions = new long[numOfActiveLedgers][];
+        for (int i = 0; i < numOfActiveLedgers; i++) {
+            positions[i] = new long[numEntries];
+        }
+
+        /*
+         * addentries to the ledgers
+         */
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                positions[i][j] = entryLogger.addEntry((long) i, 
generateEntry(i, j));
+                long entryLogId = (positions[i][j] >> 32L);
+                if (initialEntryLogPerLedgerEnabled) {
+                    Assert.assertEquals("EntryLogId for ledger: " + i, i, 
entryLogId);
+                } else {
+                    Assert.assertEquals("EntryLogId for ledger: " + i, 0, 
entryLogId);
+                }
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            entryLogManager.createNewLog(i);
+        }
+
+        /**
+         * since new entrylog is created for all the ledgers, the previous
+         * entrylogs must be rotated and with the following flushRotatedLogs
+         * call they should be forcewritten and file should be closed.
+         */
+        entryLogManager.flushRotatedLogs();
+
+        /*
+         * new entrylogger and entryLogManager are created with
+         * 'laterEntryLogPerLedgerEnabled' conf
+         */
+        conf.setEntryLogPerLedgerEnabled(laterEntryLogPerLedgerEnabled);
+        LedgerDirsManager newLedgerDirsManager = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
+        EntryLogger newEntryLogger = new EntryLogger(conf, 
newLedgerDirsManager);
+        EntryLogManager newEntryLogManager = 
newEntryLogger.getEntryLogManager();
+        Assert.assertEquals("EntryLogManager class type",
+                laterEntryLogPerLedgerEnabled ? 
EntryLogManagerForEntryLogPerLedger.class
+                        : EntryLogManagerForSingleEntryLog.class,
+                newEntryLogManager.getClass());
+
+        /*
+         * read the entries (which are written with previous entrylogger) with
+         * new entrylogger
+         */
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                String expectedValue = "ledger-" + i + "-" + j;
+                ByteBuf buf = newEntryLogger.readEntry(i, j, positions[i][j]);
+                long ledgerId = buf.readLong();
+                long entryId = buf.readLong();
+                byte[] data = new byte[buf.readableBytes()];
+                buf.readBytes(data);
+                assertEquals("LedgerId ", i, ledgerId);
+                assertEquals("EntryId ", j, entryId);
+                assertEquals("Entry Data ", expectedValue, new String(data));
+            }
+        }
+    }
+
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
index 604099c..921d310 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
@@ -28,9 +28,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Duration;
-import java.util.Arrays;
 import java.util.Enumeration;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
@@ -42,6 +40,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
+import 
org.apache.bookkeeper.bookie.EntryLogManagerForEntryLogPerLedger.BufferedLogChannelWithDirInfo;
 import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.bookie.Journal.LastLogMark;
 import org.apache.bookkeeper.client.BKException;
@@ -494,44 +493,33 @@ public class LedgerStorageCheckpointTest {
         BookKeeper bkClient = new BookKeeper(clientConf);
         InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) 
server.getBookie().ledgerStorage;
         EntryLogger entryLogger = ledgerStorage.entryLogger;
-        EntryLogManagerBase entryLogManagerBase = (EntryLogManagerBase) 
entryLogger.getEntryLogManager();
+        EntryLogManagerForEntryLogPerLedger entryLogManager = 
(EntryLogManagerForEntryLogPerLedger) entryLogger
+                .getEntryLogManager();
 
+        Random rand = new Random();
         int numOfEntries = 5;
         byte[] dataBytes = "data".getBytes();
 
-        long ledgerId = 10;
-        LedgerHandle handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, 
DigestType.CRC32, "passwd".getBytes(), null);
-        for (int j = 0; j < numOfEntries; j++) {
-            handle.addEntry(j, dataBytes);
-        }
-        handle.close();
-        // simulate rolling entrylog
-        entryLogManagerBase.createNewLog(ledgerId);
-
-        ledgerId = 20;
-        handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, 
"passwd".getBytes(), null);
-        for (int j = 0; j < numOfEntries; j++) {
-            handle.addEntry(j, dataBytes);
-        }
-        handle.close();
-        // simulate rolling entrylog
-        entryLogManagerBase.createNewLog(ledgerId);
-
-        ledgerId = 30;
-        handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, 
"passwd".getBytes(), null);
-        for (int j = 0; j < numOfEntries; j++) {
-            handle.addEntry(j, dataBytes);
+        int numOfLedgers = 3;
+        long[] ledgerIds = new long[numOfLedgers];
+        LedgerHandle handle;
+        for (int i = 0; i < numOfLedgers; i++) {
+            ledgerIds[i] = rand.nextInt(100000) + 1;
+            handle = bkClient.createLedgerAdv(ledgerIds[i], 1, 1, 1, 
DigestType.CRC32, "passwd".getBytes(), null);
+            for (int j = 0; j < numOfEntries; j++) {
+                handle.addEntry(j, dataBytes);
+            }
+            // simulate rolling entrylog
+            entryLogManager.createNewLog(ledgerIds[i]);
         }
-        handle.close();
 
-        Set<BufferedLogChannel> copyOfCurrentLogs = new 
HashSet<BufferedLogChannel>(
-                
Arrays.asList(entryLogManagerBase.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
-        for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+        Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = 
entryLogManager.getCopyOfCurrentLogs();
+        for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : 
copyOfCurrentLogsWithDirInfo) {
             Assert.assertNotEquals("bytesWrittenSinceLastFlush shouldn't be 
zero", 0,
-                    currentLog.getUnpersistedBytes());
+                    
currentLogWithDirInfo.getLogChannel().getUnpersistedBytes());
         }
         Assert.assertNotEquals("There should be logChannelsToFlush", 0,
-                entryLogManagerBase.getRotatedLogChannels().size());
+                entryLogManager.getRotatedLogChannels().size());
 
         /*
          * wait for atleast flushInterval period, so that checkpoint can 
happen.
@@ -542,15 +530,14 @@ public class LedgerStorageCheckpointTest {
          * since checkpoint happenend, there shouldn't be any 
logChannelsToFlush
          * and bytesWrittenSinceLastFlush should be zero.
          */
-        List<BufferedLogChannel> copyOfRotatedLogChannels = 
entryLogManagerBase.getRotatedLogChannels();
+        List<BufferedLogChannel> copyOfRotatedLogChannels = 
entryLogManager.getRotatedLogChannels();
         Assert.assertTrue("There shouldn't be logChannelsToFlush",
                 ((copyOfRotatedLogChannels == null) || 
(copyOfRotatedLogChannels.size() == 0)));
 
-        copyOfCurrentLogs = new HashSet<BufferedLogChannel>(
-                
Arrays.asList(entryLogManagerBase.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
-        for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+        copyOfCurrentLogsWithDirInfo = entryLogManager.getCopyOfCurrentLogs();
+        for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : 
copyOfCurrentLogsWithDirInfo) {
             Assert.assertEquals("bytesWrittenSinceLastFlush should be zero", 0,
-                    currentLog.getUnpersistedBytes());
+                    
currentLogWithDirInfo.getLogChannel().getUnpersistedBytes());
         }
     }
 
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 24eb1a9..1be3fe3 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -21,10 +21,10 @@
 # - `Bookie Server`     : bookie server generic settings, including network 
related settings.
 # - `Bookie Storage`    : bookie storage related settings, such as journal, 
entrylogger, gc and ledger storages.
 # - `Security`          : security related settings
-# - `Metadata Services` : metadata service related settings 
+# - `Metadata Services` : metadata service related settings
 # - `Stats Providers`   : stats providers related settings
 # - `Auto Recovery`     : auto recovery related settings
-# 
+#
 
 ############################################## Bookie Server 
##############################################
 
@@ -362,7 +362,7 @@ ledgerDirectories=/tmp/bk-data
 # This parameter allows creating entry log files when there are enough disk 
spaces, even when
 # the bookie is running at readonly mode because of the disk usage is 
exceeding `diskUsageThreshold`.
 # Because compaction, journal replays can still write data to disks when a 
bookie is readonly.
-# 
+#
 # Default value is 1.2 * `logSizeLimit`.
 #
 # minUsableSizeForEntryLogCreation=
@@ -377,8 +377,8 @@ ledgerDirectories=/tmp/bk-data
 # When entryLogPerLedgerEnabled is enabled, checkpoint doesn't happens
 # when a new active entrylog is created / previous one is rolled over.
 # Instead SyncThread checkpoints periodically with 'flushInterval' delay
-# (in milliseconds) in between executions. Checkpoint flushes both ledger 
-# entryLogs and ledger index pages to disk. 
+# (in milliseconds) in between executions. Checkpoint flushes both ledger
+# entryLogs and ledger index pages to disk.
 # Flushing entrylog and index files will introduce much random disk I/O.
 # If separating journal dir and ledger dirs each on different devices,
 # flushing would not affect performance. But if putting journal dir
@@ -417,15 +417,22 @@ ledgerDirectories=/tmp/bk-data
 # The number of bytes used as capacity for the write buffer. Default is 64KB.
 # writeBufferSizeBytes=65536
 
-# Specifies if entryLog per ledger is enabled/disabled. If it is enabled, then 
there would be a 
-# active entrylog for each ledger. It would be ideal to enable this feature if 
the underlying 
-# storage device has multiple DiskPartitions or SSD and if in a given moment, 
entries of fewer 
+# Specifies if entryLog per ledger is enabled/disabled. If it is enabled, then 
there would be a
+# active entrylog for each ledger. It would be ideal to enable this feature if 
the underlying
+# storage device has multiple DiskPartitions or SSD and if in a given moment, 
entries of fewer
 # number of active ledgers are written to a bookie.
 # entryLogPerLedgerEnabled=false
 
 # In the case of multipleentrylogs, multiple threads can be used to flush the 
memtable
 # numOfMemtableFlushThreads=8
 
+# in entryLogPerLedger feature, the time duration used for lastaccess eviction 
policy for cache
+# entrylogMapAccessExpiryTimeInSeconds=300
+
+# in entryLogPerLedger feature, this specifies the maximum number of entrylogs 
that can be
+# active at a given point in time
+# maximumNumberOfActiveEntryLogs=500
+
 #############################################################################
 ## Entry log compaction settings
 #############################################################################
@@ -722,7 +729,7 @@ zkEnableSecurity=false
 #   - Twitter Science   : 
org.apache.bookkeeper.stats.twitter.science.TwitterStatsProvider
 # Default value is:
 #   org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider
-# 
+#
 # For configuring corresponding stats provider, see details at each section 
below.
 #
 # 
statsProviderClass=org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider
@@ -821,7 +828,7 @@ zkEnableSecurity=false
 # the following settings take effects when `autoRecoveryDaemonEnabled` is true.
 
 # The ensemble placement policy used for re-replicating entries.
-# 
+#
 # Options:
 #   - org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy
 #   - org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index f366c60..6aca797 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -308,6 +308,12 @@ groups:
   - param: entryLogPerLedgerEnabled
     description: Specifies if entryLog per ledger is enabled/disabled. If it 
is enabled, then there would be a active entrylog for each ledger. It would be 
ideal to enable this feature if the underlying storage device has multiple 
DiskPartitions or SSD and if in a given moment, entries of fewer number of 
active ledgers are written to the bookie.
     default: false
+  - param: entrylogMapAccessExpiryTimeInSeconds
+    description: config specifying if the entrylog per ledger is enabled, then 
the amount of time EntryLogManagerForEntryLogPerLedger should wait for closing 
the entrylog file after the last addEntry call for that ledger, if explicit 
writeclose for that ledger is not received.
+    default: 300
+  - param: maximumNumberOfActiveEntryLogs
+    description: in entryLogPerLedger feature, this specifies the maximum 
number of entrylogs that can be active at a given point in time. If there are 
more number of active entryLogs then the maximumNumberOfActiveEntryLogs then 
the entrylog will be evicted from the cache.
+    default: 500
 
 - name: Entry log compaction settings
   params:

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to