This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new cf7a72b  Refactor EntryLogger class
cf7a72b is described below

commit cf7a72b011378ac5be8e04edd35dfb97e00cedab
Author: cguttapalem <[email protected]>
AuthorDate: Thu May 3 15:49:30 2018 -0700

    Refactor EntryLogger class
    
    Descriptions of the changes in this PR:
    
    - Split EntryLogger class into multiple classes.
    - create separate classes for EntryLogManager, EntryLoggerAllocator and 
EntryLogManagerForSingleEntryLog.
    
    Author: cguttapalem <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #1365 from reddycharan/refactorentrylogger
---
 .../apache/bookkeeper/bookie/EntryLogManager.java  | 103 ++++
 .../bookkeeper/bookie/EntryLogManagerBase.java     | 163 ++++++
 .../bookie/EntryLogManagerForSingleEntryLog.java   | 263 +++++++++
 .../org/apache/bookkeeper/bookie/EntryLogger.java  | 588 +--------------------
 .../bookkeeper/bookie/EntryLoggerAllocator.java    | 215 ++++++++
 .../apache/bookkeeper/bookie/CreateNewLogTest.java |   2 -
 .../org/apache/bookkeeper/bookie/EntryLogTest.java |   5 +-
 .../bookie/LedgerStorageCheckpointTest.java        |   1 -
 .../bookie/SortedLedgerStorageCheckpointTest.java  |   1 -
 9 files changed, 758 insertions(+), 583 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java
new file mode 100644
index 0000000..340e9a1
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+
+interface EntryLogManager {
+
+    /*
+     * add entry to the corresponding entrylog and return the position of
+     * the entry in the entrylog
+     */
+    long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
+
+    /*
+     * gets the active logChannel with the given entryLogId. null if it is
+     * not existing.
+     */
+    BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+    /*
+     * Returns eligible writable ledger dir for the creation next entrylog
+     */
+    File getDirForNextEntryLog(List<File> writableLedgerDirs);
+
+    /*
+     * Do the operations required for checkpoint.
+     */
+    void checkpoint() throws IOException;
+
+    /*
+     * flush both current and rotated logs.
+     */
+    void flush() throws IOException;
+
+    /*
+     * close current logs.
+     */
+    void close() throws IOException;
+
+    /*
+     * force close current logs.
+     */
+    void forceClose();
+
+    /*
+     * prepare entrylogger/entrylogmanager before doing SortedLedgerStorage
+     * Checkpoint.
+     */
+    void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws 
IOException;
+
+    /*
+     * this method should be called before doing entrymemtable flush, it
+     * would save the state of the entrylogger before entrymemtable flush
+     * and commitEntryMemTableFlush would take appropriate action after
+     * entrymemtable flush.
+     */
+    void prepareEntryMemTableFlush();
+
+    /*
+     * this method should be called after doing entrymemtable flush,it would
+     * take appropriate action after entrymemtable flush depending on the
+     * current state of the entrylogger and the state of the entrylogger
+     * during prepareEntryMemTableFlush.
+     *
+     * It is assumed that there would be corresponding
+     * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both
+     * would be called from the same thread.
+     *
+     * returns boolean value indicating whether EntryMemTable should do 
checkpoint
+     * after this commit method.
+     */
+    boolean commitEntryMemTableFlush() throws IOException;
+
+    /*
+     * creates new separate log for compaction.
+     */
+    BufferedLogChannel createNewLogForCompaction() throws IOException;
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
new file mode 100644
index 0000000..849336f
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
@@ -0,0 +1,163 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.FastThreadLocal;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
+import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+@Slf4j
+abstract class EntryLogManagerBase implements EntryLogManager {
+    volatile List<BufferedLogChannel> rotatedLogChannels;
+    final EntryLoggerAllocator entryLoggerAllocator;
+    private final LedgerDirsManager ledgerDirsManager;
+    private final List<EntryLogger.EntryLogListener> listeners;
+    /**
+     * The maximum size of a entry logger file.
+     */
+    final long logSizeLimit;
+
+    EntryLogManagerBase(ServerConfiguration conf, LedgerDirsManager 
ledgerDirsManager,
+            EntryLoggerAllocator entryLoggerAllocator, 
List<EntryLogger.EntryLogListener> listeners) {
+        this.ledgerDirsManager = ledgerDirsManager;
+        this.entryLoggerAllocator = entryLoggerAllocator;
+        this.listeners = listeners;
+        this.logSizeLimit = conf.getEntryLogSizeLimit();
+    }
+
+    private final FastThreadLocal<ByteBuf> sizeBufferForAdd = new 
FastThreadLocal<ByteBuf>() {
+        @Override
+        protected ByteBuf initialValue() throws Exception {
+            return Unpooled.buffer(4);
+        }
+    };
+
+    /*
+     * This method should be guarded by a lock, so callers of this method
+     * should be in the right scope of the lock.
+     */
+    @Override
+    public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws 
IOException {
+        int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
+        BufferedLogChannel logChannel = 
getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
+        ByteBuf sizeBuffer = sizeBufferForAdd.get();
+        sizeBuffer.clear();
+        sizeBuffer.writeInt(entry.readableBytes());
+        logChannel.write(sizeBuffer);
+
+        long pos = logChannel.position();
+        logChannel.write(entry);
+        logChannel.registerWrittenEntry(ledger, entrySize);
+
+        return (logChannel.getLogId() << 32L) | pos;
+    }
+
+    boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {
+        if (logChannel == null) {
+            return false;
+        }
+        return logChannel.position() + size > logSizeLimit;
+    }
+
+    boolean readEntryLogHardLimit(BufferedLogChannel logChannel, long size) {
+        if (logChannel == null) {
+            return false;
+        }
+        return logChannel.position() + size > Integer.MAX_VALUE;
+    }
+
+    abstract BufferedLogChannel getCurrentLogForLedger(long ledgerId);
+
+    abstract BufferedLogChannel getCurrentLogForLedgerForAddEntry(long 
ledgerId, int entrySize, boolean rollLog)
+            throws IOException;
+
+    abstract void setCurrentLogForLedgerAndAddToRotate(long ledgerId, 
BufferedLogChannel logChannel);
+
+    /*
+     * flush current logs.
+     */
+    abstract void flushCurrentLogs() throws IOException;
+
+    /*
+     * flush rotated logs.
+     */
+    abstract void flushRotatedLogs() throws IOException;
+
+    List<BufferedLogChannel> getRotatedLogChannels() {
+        return rotatedLogChannels;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        flushCurrentLogs();
+        flushRotatedLogs();
+    }
+
+    void flushLogChannel(BufferedLogChannel logChannel, boolean forceMetadata) 
throws IOException {
+        if (logChannel != null) {
+            logChannel.flushAndForceWrite(forceMetadata);
+            log.debug("Flush and sync current entry logger {}", 
logChannel.getLogId());
+        }
+    }
+
+    /*
+     * Creates a new log file. This method should be guarded by a lock,
+     * so callers of this method should be in right scope of the lock.
+     */
+    void createNewLog(long ledgerId) throws IOException {
+        BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId);
+        // first tried to create a new log channel. add current log channel to 
ToFlush list only when
+        // there is a new log channel. it would prevent that a log channel is 
referenced by both
+        // *logChannel* and *ToFlush* list.
+        if (null != logChannel) {
+
+            // flush the internal buffer back to filesystem but not sync disk
+            logChannel.flush();
+
+            // Append ledgers map at the end of entry log
+            logChannel.appendLedgersMap();
+
+            BufferedLogChannel newLogChannel = 
entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
+            setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
+            log.info("Flushing entry logger {} back to filesystem, pending for 
syncing entry loggers : {}.",
+                    logChannel.getLogId(), rotatedLogChannels);
+            for (EntryLogListener listener : listeners) {
+                listener.onRotateEntryLog();
+            }
+        } else {
+            setCurrentLogForLedgerAndAddToRotate(ledgerId,
+                    
entryLoggerAllocator.createNewLog(selectDirForNextEntryLog()));
+        }
+    }
+
+    File selectDirForNextEntryLog() throws NoWritableLedgerDirException {
+        return 
getDirForNextEntryLog(ledgerDirsManager.getWritableLedgerDirsForNewLog());
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
new file mode 100644
index 0000000..84e4ad3
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
@@ -0,0 +1,263 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static org.apache.bookkeeper.bookie.EntryLogger.INVALID_LID;
+import static org.apache.bookkeeper.bookie.EntryLogger.UNASSIGNED_LEDGERID;
+
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+@Slf4j
+class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
+
+    private volatile BufferedLogChannel activeLogChannel;
+    private long logIdBeforeFlush = INVALID_LID;
+    private final AtomicBoolean shouldCreateNewEntryLog = new 
AtomicBoolean(false);
+    private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+
+    EntryLogManagerForSingleEntryLog(ServerConfiguration conf, 
LedgerDirsManager ledgerDirsManager,
+            EntryLoggerAllocator entryLoggerAllocator, 
List<EntryLogger.EntryLogListener> listeners,
+            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) {
+        super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+        this.rotatedLogChannels = new LinkedList<BufferedLogChannel>();
+        this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+        // Register listener for disk full notifications.
+        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+    }
+
+    private LedgerDirsListener getLedgerDirsListener() {
+        return new LedgerDirsListener() {
+            @Override
+            public void diskFull(File disk) {
+                // If the current entry log disk is full, then create new
+                // entry log.
+                BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+                if (currentActiveLogChannel != null
+                        && 
currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
+                    shouldCreateNewEntryLog.set(true);
+                }
+            }
+
+            @Override
+            public void diskAlmostFull(File disk) {
+                // If the current entry log disk is almost full, then create 
new entry
+                // log.
+                BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+                if (currentActiveLogChannel != null
+                        && 
currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
+                    shouldCreateNewEntryLog.set(true);
+                }
+            }
+        };
+    }
+
+    @Override
+    public synchronized long addEntry(long ledger, ByteBuf entry, boolean 
rollLog) throws IOException {
+        return super.addEntry(ledger, entry, rollLog);
+    }
+
+    @Override
+    synchronized BufferedLogChannel getCurrentLogForLedgerForAddEntry(long 
ledgerId, int entrySize,
+            boolean rollLog) throws IOException {
+        if (null == activeLogChannel) {
+            // log channel can be null because the file is deferred to be 
created
+            createNewLog(UNASSIGNED_LEDGERID);
+        }
+
+        boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(activeLogChannel, entrySize)
+                : readEntryLogHardLimit(activeLogChannel, entrySize);
+        // Create new log if logSizeLimit reached or current disk is full
+        boolean createNewLog = shouldCreateNewEntryLog.get();
+        if (createNewLog || reachEntryLogLimit) {
+            if (activeLogChannel != null) {
+                activeLogChannel.flushAndForceWriteIfRegularFlush(false);
+            }
+            createNewLog(UNASSIGNED_LEDGERID);
+            // Reset the flag
+            if (createNewLog) {
+                shouldCreateNewEntryLog.set(false);
+            }
+        }
+        return activeLogChannel;
+    }
+
+    @Override
+    synchronized void createNewLog(long ledgerId) throws IOException {
+        super.createNewLog(ledgerId);
+    }
+
+    @Override
+    public synchronized void setCurrentLogForLedgerAndAddToRotate(long 
ledgerId, BufferedLogChannel logChannel) {
+        BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
+        activeLogChannel = logChannel;
+        if (hasToRotateLogChannel != null) {
+            rotatedLogChannels.add(hasToRotateLogChannel);
+        }
+    }
+
+    @Override
+    public BufferedLogChannel getCurrentLogForLedger(long ledgerId) {
+        return activeLogChannel;
+    }
+
+    @Override
+    public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+        BufferedLogChannel activeLogChannelTemp = activeLogChannel;
+        if ((activeLogChannelTemp != null) && (activeLogChannelTemp.getLogId() 
== entryLogId)) {
+            return activeLogChannelTemp;
+        }
+        return null;
+    }
+
+    @Override
+    public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
+        Collections.shuffle(writableLedgerDirs);
+        return writableLedgerDirs.get(0);
+    }
+
+    @Override
+    public void checkpoint() throws IOException {
+        flushRotatedLogs();
+    }
+
+    public long getCurrentLogId() {
+        BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+        if (currentActiveLogChannel != null) {
+            return currentActiveLogChannel.getLogId();
+        } else {
+            return EntryLogger.UNINITIALIZED_LOG_ID;
+        }
+    }
+
+    @Override
+    public void flushCurrentLogs() throws IOException {
+        BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+        if (currentActiveLogChannel != null) {
+            /**
+             * flushCurrentLogs method is called during checkpoint, so
+             * metadata of the file also should be force written.
+             */
+            flushLogChannel(currentActiveLogChannel, true);
+        }
+    }
+
+    @Override
+    void flushRotatedLogs() throws IOException {
+        List<BufferedLogChannel> channels = null;
+        synchronized (this) {
+            channels = rotatedLogChannels;
+            rotatedLogChannels = new LinkedList<BufferedLogChannel>();
+        }
+        if (null == channels) {
+            return;
+        }
+        Iterator<BufferedLogChannel> chIter = channels.iterator();
+        while (chIter.hasNext()) {
+            BufferedLogChannel channel = chIter.next();
+            try {
+                channel.flushAndForceWrite(true);
+            } catch (IOException ioe) {
+                // rescue from flush exception, add unflushed channels back
+                synchronized (this) {
+                    if (null == rotatedLogChannels) {
+                        rotatedLogChannels = channels;
+                    } else {
+                        rotatedLogChannels.addAll(0, channels);
+                    }
+                }
+                throw ioe;
+            }
+            // remove the channel from the list after it is successfully 
flushed
+            chIter.remove();
+            // since this channel is only used for writing, after flushing the 
channel,
+            // we had to close the underlying file channel. Otherwise, we 
might end up
+            // leaking fds which cause the disk spaces could not be reclaimed.
+            EntryLogger.closeFileChannel(channel);
+            
recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
+            log.info("Synced entry logger {} to disk.", channel.getLogId());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (activeLogChannel != null) {
+            EntryLogger.closeFileChannel(activeLogChannel);
+        }
+    }
+
+    @Override
+    public void forceClose() {
+        if (activeLogChannel != null) {
+            EntryLogger.forceCloseFileChannel(activeLogChannel);
+        }
+    }
+
+    @Override
+    public void prepareEntryMemTableFlush() {
+        logIdBeforeFlush = getCurrentLogId();
+    }
+
+    @Override
+    public boolean commitEntryMemTableFlush() throws IOException {
+        long logIdAfterFlush = getCurrentLogId();
+        /*
+         * in any case that an entry log reaches the limit, we roll the log
+         * and start checkpointing. if a memory table is flushed spanning
+         * over two entry log files, we also roll log. this is for
+         * performance consideration: since we don't wanna checkpoint a new
+         * log file that ledger storage is writing to.
+         */
+        if (reachEntryLogLimit(activeLogChannel, 0L) || logIdAfterFlush != 
logIdBeforeFlush) {
+            log.info("Rolling entry logger since it reached size limitation");
+            createNewLog(UNASSIGNED_LEDGERID);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) 
throws IOException{
+        if (numBytesFlushed > 0) {
+            // if bytes are added between previous flush and this checkpoint,
+            // it means bytes might live at current active entry log, we need
+            // roll current entry log and then issue checkpoint to underlying
+            // interleaved ledger storage.
+            createNewLog(UNASSIGNED_LEDGERID);
+        }
+    }
+
+    @Override
+    public EntryLogger.BufferedLogChannel createNewLogForCompaction() throws 
IOException {
+        return 
entryLoggerAllocator.createNewLogForCompaction(selectDirForNextEntryLog());
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index d289b0e..75445c2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -22,8 +22,6 @@
 package org.apache.bookkeeper.bookie;
 
 import static com.google.common.base.Charsets.UTF_8;
-import static 
org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
-import static 
org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
@@ -35,18 +33,14 @@ import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.FastThreadLocal;
-
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -54,24 +48,15 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
@@ -90,7 +75,7 @@ public class EntryLogger {
     private static final Logger LOG = 
LoggerFactory.getLogger(EntryLogger.class);
     static final long UNASSIGNED_LEDGERID = -1L;
     // log file suffix
-    private static final String LOG_FILE_SUFFIX = ".log";
+    static final String LOG_FILE_SUFFIX = ".log";
 
     @VisibleForTesting
     static final int UNINITIALIZED_LOG_ID = -0xDEAD;
@@ -145,7 +130,7 @@ public class EntryLogger {
          * Append the ledger map at the end of the entry log.
          * Updates the entry log file header with the offset and size of the 
map.
          */
-        private void appendLedgersMap() throws IOException {
+        void appendLedgersMap() throws IOException {
 
             long ledgerMapOffset = this.position();
 
@@ -228,21 +213,16 @@ public class EntryLogger {
      */
     private final Object compactionLogLock = new Object();
 
-    /**
-     * The maximum size of a entry logger file.
-     */
-    final long logSizeLimit;
     private volatile BufferedLogChannel compactionLogChannel;
 
     final EntryLoggerAllocator entryLoggerAllocator;
     private final EntryLogManager entryLogManager;
 
-    private final boolean entryLogPreAllocationEnabled;
     private final CopyOnWriteArrayList<EntryLogListener> listeners = new 
CopyOnWriteArrayList<EntryLogListener>();
 
     private static final int HEADER_V0 = 0; // Old log file format (no ledgers 
map index)
     private static final int HEADER_V1 = 1; // Introduced ledger map index
-    private static final int HEADER_CURRENT_VERSION = HEADER_V1;
+    static final int HEADER_CURRENT_VERSION = HEADER_V1;
 
     private static class Header {
         final int version;
@@ -360,9 +340,6 @@ public class EntryLogger {
         if (listener != null) {
             addListener(listener);
         }
-        // log size limit
-        this.logSizeLimit = Math.min(conf.getEntryLogSizeLimit(), 
MAX_LOG_SIZE_LIMIT);
-        this.entryLogPreAllocationEnabled = 
conf.isEntryLogFilePreAllocationEnabled();
 
         // Initialize the entry log header buffer. This cannot be a static 
object
         // since in our unit tests, we run multiple Bookies and thus 
EntryLoggers
@@ -386,9 +363,11 @@ public class EntryLogger {
             }
         }
         this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId 
+ 1);
-        this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
+        this.entryLoggerAllocator = new EntryLoggerAllocator(conf, 
ledgerDirsManager, recentlyCreatedEntryLogsStatus,
+                logId);
         if (entryLogPerLedgerEnabled) {
-            this.entryLogManager = new 
EntryLogManagerForSingleEntryLog(ledgerDirsManager) {
+            this.entryLogManager = new EntryLogManagerForSingleEntryLog(conf, 
ledgerDirsManager, entryLoggerAllocator,
+                    listeners, recentlyCreatedEntryLogsStatus) {
                 @Override
                 public void checkpoint() throws IOException {
                     /*
@@ -445,7 +424,8 @@ public class EntryLogger {
                 }
             };
         } else {
-            this.entryLogManager = new 
EntryLogManagerForSingleEntryLog(ledgerDirsManager);
+            this.entryLogManager = new EntryLogManagerForSingleEntryLog(conf, 
ledgerDirsManager, entryLoggerAllocator,
+                    listeners, recentlyCreatedEntryLogsStatus);
         }
     }
 
@@ -584,137 +564,6 @@ public class EntryLogger {
     }
 
     /**
-     * An allocator pre-allocates entry log files.
-     */
-    class EntryLoggerAllocator {
-
-        private long preallocatedLogId;
-        private Future<BufferedLogChannel> preallocation = null;
-        private ExecutorService allocatorExecutor;
-        private final Object createEntryLogLock = new Object();
-        private final Object createCompactionLogLock = new Object();
-
-        EntryLoggerAllocator(long logId) {
-            preallocatedLogId = logId;
-            allocatorExecutor = Executors.newSingleThreadExecutor();
-        }
-
-        synchronized long getPreallocatedLogId() {
-            return preallocatedLogId;
-        }
-
-        BufferedLogChannel createNewLog() throws IOException {
-            synchronized (createEntryLogLock) {
-                BufferedLogChannel bc;
-                if (!entryLogPreAllocationEnabled){
-                    // create a new log directly
-                    bc = allocateNewLog();
-                    return bc;
-                } else {
-                    // allocate directly to response request
-                    if (null == preallocation){
-                        bc = allocateNewLog();
-                    } else {
-                        // has a preallocated entry log
-                        try {
-                            bc = preallocation.get();
-                        } catch (ExecutionException ee) {
-                            if (ee.getCause() instanceof IOException) {
-                                throw (IOException) (ee.getCause());
-                            } else {
-                                throw new IOException("Error to execute entry 
log allocation.", ee);
-                            }
-                        } catch (CancellationException ce) {
-                            throw new IOException("Task to allocate a new 
entry log is cancelled.", ce);
-                        } catch (InterruptedException ie) {
-                            Thread.currentThread().interrupt();
-                            throw new IOException("Intrrupted when waiting a 
new entry log to be allocated.", ie);
-                        }
-                    }
-                    // preallocate a new log in background upon every call
-                    preallocation = allocatorExecutor.submit(() -> 
allocateNewLog());
-                    return bc;
-                }
-            }
-        }
-
-        BufferedLogChannel createNewLogForCompaction() throws IOException {
-            synchronized (createCompactionLogLock) {
-                return allocateNewLog(COMPACTING_SUFFIX);
-            }
-        }
-
-        private synchronized BufferedLogChannel allocateNewLog() throws 
IOException {
-            return allocateNewLog(".log");
-        }
-
-        /**
-         * Allocate a new log file.
-         */
-        private synchronized BufferedLogChannel allocateNewLog(String suffix) 
throws IOException {
-            List<File> list = 
ledgerDirsManager.getWritableLedgerDirsForNewLog();
-            File dirForNextEntryLog = 
entryLogManager.getDirForNextEntryLog(list);
-
-            List<File> ledgersDirs = ledgerDirsManager.getAllLedgerDirs();
-            String logFileName;
-            // It would better not to overwrite existing entry log files
-            File testLogFile = null;
-            do {
-                if (preallocatedLogId >= Integer.MAX_VALUE) {
-                    preallocatedLogId = 0;
-                } else {
-                    ++preallocatedLogId;
-                }
-                logFileName = Long.toHexString(preallocatedLogId) + suffix;
-                for (File dir : ledgersDirs) {
-                    testLogFile = new File(dir, logFileName);
-                    if (testLogFile.exists()) {
-                        LOG.warn("Found existed entry log " + testLogFile
-                               + " when trying to create it as a new log.");
-                        testLogFile = null;
-                        break;
-                    }
-                }
-            } while (testLogFile == null);
-
-            File newLogFile = new File(dirForNextEntryLog, logFileName);
-            FileChannel channel = new RandomAccessFile(newLogFile, 
"rw").getChannel();
-
-            BufferedLogChannel logChannel = new BufferedLogChannel(channel, 
conf.getWriteBufferBytes(),
-                    conf.getReadBufferBytes(), preallocatedLogId, newLogFile, 
conf.getFlushIntervalInBytes());
-            logfileHeader.readerIndex(0);
-            logChannel.write(logfileHeader);
-
-            for (File f : list) {
-                setLastLogId(f, preallocatedLogId);
-            }
-
-            if (suffix.equals(LOG_FILE_SUFFIX)) {
-                
recentlyCreatedEntryLogsStatus.createdEntryLog(preallocatedLogId);
-            }
-
-            LOG.info("Created new entry log file {} for logId {}.", 
newLogFile, preallocatedLogId);
-            return logChannel;
-        }
-
-        /**
-         * Stop the allocator.
-         */
-        void stop() {
-            // wait until the preallocation finished.
-            allocatorExecutor.shutdown();
-            LOG.info("Stopped entry logger preallocator.");
-        }
-
-        /**
-         * get the preallocation for tests.
-         */
-        Future<BufferedLogChannel> getPreallocationFuture(){
-            return preallocation;
-        }
-    }
-
-    /**
      * Remove entry log.
      *
      * @param entryLogId
@@ -736,27 +585,6 @@ public class EntryLogger {
         return true;
     }
 
-    /**
-     * writes the given id to the "lastId" file in the given directory.
-     */
-    private void setLastLogId(File dir, long logId) throws IOException {
-        FileOutputStream fos;
-        fos = new FileOutputStream(new File(dir, "lastId"));
-        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fos, 
UTF_8));
-        try {
-            bw.write(Long.toHexString(logId) + "\n");
-            bw.flush();
-        } catch (IOException e) {
-            LOG.warn("Failed write lastId file");
-        } finally {
-            try {
-                bw.close();
-            } catch (IOException e) {
-                LOG.error("Could not close lastId file in {}", dir.getPath());
-            }
-        }
-    }
-
     private long getLastLogId(File dir) {
         long id = readLastLogId(dir);
         // read success
@@ -812,396 +640,6 @@ public class EntryLogger {
         }
     }
 
-    interface EntryLogManager {
-
-        /*
-         * add entry to the corresponding entrylog and return the position of
-         * the entry in the entrylog
-         */
-        long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws 
IOException;
-
-        /*
-         * gets the active logChannel with the given entryLogId. null if it is
-         * not existing.
-         */
-        BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
-
-        /*
-         * Returns eligible writable ledger dir for the creation next entrylog
-         */
-        File getDirForNextEntryLog(List<File> writableLedgerDirs);
-
-        /*
-         * Do the operations required for checkpoint.
-         */
-        void checkpoint() throws IOException;
-
-        /*
-         * flush both current and rotated logs.
-         */
-        void flush() throws IOException;
-
-        /*
-         * close current logs.
-         */
-        void close() throws IOException;
-
-        /*
-         * force close current logs.
-         */
-        void forceClose();
-
-        /*
-         *
-         */
-        void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws 
IOException;
-
-        /*
-         * this method should be called before doing entrymemtable flush, it
-         * would save the state of the entrylogger before entrymemtable flush
-         * and commitEntryMemTableFlush would take appropriate action after
-         * entrymemtable flush.
-         */
-        void prepareEntryMemTableFlush();
-
-        /*
-         * this method should be called after doing entrymemtable flush,it 
would
-         * take appropriate action after entrymemtable flush depending on the
-         * current state of the entrylogger and the state of the entrylogger
-         * during prepareEntryMemTableFlush.
-         *
-         * It is assumed that there would be corresponding
-         * prepareEntryMemTableFlush for every commitEntryMemTableFlush and 
both
-         * would be called from the same thread.
-         *
-         * returns boolean value indicating whether EntryMemTable should do 
checkpoint
-         * after this commit method.
-         */
-        boolean commitEntryMemTableFlush() throws IOException;
-    }
-
-    abstract class EntryLogManagerBase implements EntryLogManager {
-        volatile List<BufferedLogChannel> rotatedLogChannels;
-
-        private final FastThreadLocal<ByteBuf> sizeBufferForAdd = new 
FastThreadLocal<ByteBuf>() {
-            @Override
-            protected ByteBuf initialValue() throws Exception {
-                return Unpooled.buffer(4);
-            }
-        };
-
-        /*
-         * This method should be guarded by a lock, so callers of this method
-         * should be in the right scope of the lock.
-         */
-        @Override
-        public long addEntry(long ledger, ByteBuf entry, boolean rollLog) 
throws IOException {
-            int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to 
prepend the size
-            BufferedLogChannel logChannel = 
getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
-            ByteBuf sizeBuffer = sizeBufferForAdd.get();
-            sizeBuffer.clear();
-            sizeBuffer.writeInt(entry.readableBytes());
-            logChannel.write(sizeBuffer);
-
-            long pos = logChannel.position();
-            logChannel.write(entry);
-            logChannel.registerWrittenEntry(ledger, entrySize);
-
-            return (logChannel.getLogId() << 32L) | pos;
-        }
-
-        boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {
-            if (logChannel == null) {
-                return false;
-            }
-            return logChannel.position() + size > logSizeLimit;
-        }
-
-        boolean readEntryLogHardLimit(BufferedLogChannel logChannel, long 
size) {
-            if (logChannel == null) {
-                return false;
-            }
-            return logChannel.position() + size > Integer.MAX_VALUE;
-        }
-
-        abstract BufferedLogChannel getCurrentLogForLedger(long ledgerId);
-
-        abstract BufferedLogChannel getCurrentLogForLedgerForAddEntry(long 
ledgerId, int entrySize, boolean rollLog)
-                throws IOException;
-
-        abstract void setCurrentLogForLedgerAndAddToRotate(long ledgerId, 
BufferedLogChannel logChannel);
-
-        /*
-         * flush current logs.
-         */
-        abstract void flushCurrentLogs() throws IOException;
-
-        /*
-         * flush rotated logs.
-         */
-        abstract void flushRotatedLogs() throws IOException;
-
-        List<BufferedLogChannel> getRotatedLogChannels() {
-            return rotatedLogChannels;
-        }
-
-        @Override
-        public void flush() throws IOException {
-            flushCurrentLogs();
-            flushRotatedLogs();
-        }
-
-        void flushLogChannel(BufferedLogChannel logChannel, boolean 
forceMetadata) throws IOException {
-            if (logChannel != null) {
-                logChannel.flushAndForceWrite(forceMetadata);
-                LOG.debug("Flush and sync current entry logger {}", 
logChannel.getLogId());
-            }
-        }
-
-        /*
-         * Creates a new log file. This method should be guarded by a lock,
-         * so callers of this method should be in right scope of the lock.
-         */
-        void createNewLog(long ledgerId) throws IOException {
-            BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId);
-            // first tried to create a new log channel. add current log 
channel to ToFlush list only when
-            // there is a new log channel. it would prevent that a log channel 
is referenced by both
-            // *logChannel* and *ToFlush* list.
-            if (null != logChannel) {
-
-                // flush the internal buffer back to filesystem but not sync 
disk
-                logChannel.flush();
-
-                // Append ledgers map at the end of entry log
-                logChannel.appendLedgersMap();
-
-                BufferedLogChannel newLogChannel = 
entryLoggerAllocator.createNewLog();
-                setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
-                LOG.info("Flushing entry logger {} back to filesystem, pending 
for syncing entry loggers : {}.",
-                        logChannel.getLogId(), rotatedLogChannels);
-                for (EntryLogListener listener : listeners) {
-                    listener.onRotateEntryLog();
-                }
-            } else {
-                setCurrentLogForLedgerAndAddToRotate(ledgerId, 
entryLoggerAllocator.createNewLog());
-            }
-        }
-    }
-
-    class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
-
-        private volatile BufferedLogChannel activeLogChannel;
-        private long logIdBeforeFlush = INVALID_LID;
-        private final AtomicBoolean shouldCreateNewEntryLog = new 
AtomicBoolean(false);
-
-        EntryLogManagerForSingleEntryLog(LedgerDirsManager ledgerDirsManager) {
-            this.rotatedLogChannels = new LinkedList<BufferedLogChannel>();
-            // Register listener for disk full notifications.
-            ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
-        }
-
-        private LedgerDirsListener getLedgerDirsListener() {
-            return new LedgerDirsListener() {
-                @Override
-                public void diskFull(File disk) {
-                    // If the current entry log disk is full, then create new
-                    // entry log.
-                    BufferedLogChannel currentActiveLogChannel = 
activeLogChannel;
-                    if (currentActiveLogChannel != null
-                            && 
currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
-                        shouldCreateNewEntryLog.set(true);
-                    }
-                }
-
-                @Override
-                public void diskAlmostFull(File disk) {
-                    // If the current entry log disk is almost full, then 
create new entry
-                    // log.
-                    BufferedLogChannel currentActiveLogChannel = 
activeLogChannel;
-                    if (currentActiveLogChannel != null
-                            && 
currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
-                        shouldCreateNewEntryLog.set(true);
-                    }
-                }
-            };
-        }
-
-        @Override
-        public synchronized long addEntry(long ledger, ByteBuf entry, boolean 
rollLog) throws IOException {
-            return super.addEntry(ledger, entry, rollLog);
-        }
-
-        @Override
-        synchronized BufferedLogChannel getCurrentLogForLedgerForAddEntry(long 
ledgerId, int entrySize,
-                boolean rollLog) throws IOException {
-            if (null == activeLogChannel) {
-                // log channel can be null because the file is deferred to be 
created
-                createNewLog(UNASSIGNED_LEDGERID);
-            }
-
-            boolean reachEntryLogLimit = rollLog ? 
reachEntryLogLimit(activeLogChannel, entrySize)
-                    : readEntryLogHardLimit(activeLogChannel, entrySize);
-            // Create new log if logSizeLimit reached or current disk is full
-            boolean createNewLog = shouldCreateNewEntryLog.get();
-            if (createNewLog || reachEntryLogLimit) {
-                if (activeLogChannel != null) {
-                    activeLogChannel.flushAndForceWriteIfRegularFlush(false);
-                }
-                createNewLog(UNASSIGNED_LEDGERID);
-                // Reset the flag
-                if (createNewLog) {
-                    shouldCreateNewEntryLog.set(false);
-                }
-            }
-            return activeLogChannel;
-        }
-
-        @Override
-        synchronized void createNewLog(long ledgerId) throws IOException {
-            super.createNewLog(ledgerId);
-        }
-
-        @Override
-        public synchronized void setCurrentLogForLedgerAndAddToRotate(long 
ledgerId, BufferedLogChannel logChannel) {
-            BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
-            activeLogChannel = logChannel;
-            if (hasToRotateLogChannel != null) {
-                rotatedLogChannels.add(hasToRotateLogChannel);
-            }
-        }
-
-        @Override
-        public BufferedLogChannel getCurrentLogForLedger(long ledgerId) {
-            return activeLogChannel;
-        }
-
-        @Override
-        public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
-            BufferedLogChannel activeLogChannelTemp = activeLogChannel;
-            if ((activeLogChannelTemp != null) && 
(activeLogChannelTemp.getLogId() == entryLogId)) {
-                return activeLogChannelTemp;
-            }
-            return null;
-        }
-
-        @Override
-        public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
-            Collections.shuffle(writableLedgerDirs);
-            return writableLedgerDirs.get(0);
-        }
-
-        @Override
-        public void checkpoint() throws IOException {
-            flushRotatedLogs();
-        }
-
-        public long getCurrentLogId() {
-            BufferedLogChannel currentActiveLogChannel = activeLogChannel;
-            if (currentActiveLogChannel != null) {
-                return currentActiveLogChannel.getLogId();
-            } else {
-                return EntryLogger.UNINITIALIZED_LOG_ID;
-            }
-        }
-
-        @Override
-        public void flushCurrentLogs() throws IOException {
-            BufferedLogChannel currentActiveLogChannel = activeLogChannel;
-            if (currentActiveLogChannel != null) {
-                /**
-                 * flushCurrentLogs method is called during checkpoint, so
-                 * metadata of the file also should be force written.
-                 */
-                flushLogChannel(currentActiveLogChannel, true);
-            }
-        }
-
-        @Override
-        void flushRotatedLogs() throws IOException {
-            List<BufferedLogChannel> channels = null;
-            synchronized (this) {
-                channels = rotatedLogChannels;
-                rotatedLogChannels = new LinkedList<BufferedLogChannel>();
-            }
-            if (null == channels) {
-                return;
-            }
-            Iterator<BufferedLogChannel> chIter = channels.iterator();
-            while (chIter.hasNext()) {
-                BufferedLogChannel channel = chIter.next();
-                try {
-                    channel.flushAndForceWrite(true);
-                } catch (IOException ioe) {
-                    // rescue from flush exception, add unflushed channels back
-                    synchronized (this) {
-                        if (null == rotatedLogChannels) {
-                            rotatedLogChannels = channels;
-                        } else {
-                            rotatedLogChannels.addAll(0, channels);
-                        }
-                    }
-                    throw ioe;
-                }
-                // remove the channel from the list after it is successfully 
flushed
-                chIter.remove();
-                // since this channel is only used for writing, after flushing 
the channel,
-                // we had to close the underlying file channel. Otherwise, we 
might end up
-                // leaking fds which cause the disk spaces could not be 
reclaimed.
-                closeFileChannel(channel);
-                
recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
-                LOG.info("Synced entry logger {} to disk.", 
channel.getLogId());
-            }
-        }
-
-        @Override
-        public void close() throws IOException {
-            if (activeLogChannel != null) {
-                closeFileChannel(activeLogChannel);
-            }
-        }
-
-        @Override
-        public void forceClose() {
-            if (activeLogChannel != null) {
-                forceCloseFileChannel(activeLogChannel);
-            }
-        }
-
-        @Override
-        public void prepareEntryMemTableFlush() {
-            logIdBeforeFlush = getCurrentLogId();
-        }
-
-        @Override
-        public boolean commitEntryMemTableFlush() throws IOException {
-            long logIdAfterFlush = getCurrentLogId();
-            /*
-             * in any case that an entry log reaches the limit, we roll the log
-             * and start checkpointing. if a memory table is flushed spanning
-             * over two entry log files, we also roll log. this is for
-             * performance consideration: since we don't wanna checkpoint a new
-             * log file that ledger storage is writing to.
-             */
-            if (reachEntryLogLimit(activeLogChannel, 0L) || logIdAfterFlush != 
logIdBeforeFlush) {
-                LOG.info("Rolling entry logger since it reached size 
limitation");
-                createNewLog(UNASSIGNED_LEDGERID);
-                return true;
-            }
-            return false;
-        }
-
-        @Override
-        public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) 
throws IOException{
-            if (numBytesFlushed > 0) {
-                // if bytes are added between previous flush and this 
checkpoint,
-                // it means bytes might live at current active entry log, we 
need
-                // roll current entry log and then issue checkpoint to 
underlying
-                // interleaved ledger storage.
-                createNewLog(UNASSIGNED_LEDGERID);
-            }
-        }
-    }
-
     /**
      * Flushes all rotated log channels. After log channels are flushed,
      * move leastUnflushedLogId ptr to current logId.
@@ -1273,7 +711,7 @@ public class EntryLogger {
     void createNewCompactionLog() throws IOException {
         synchronized (compactionLogLock) {
             if (compactionLogChannel == null) {
-                compactionLogChannel = 
entryLoggerAllocator.createNewLogForCompaction();
+                compactionLogChannel = 
entryLogManager.createNewLogForCompaction();
             }
         }
     }
@@ -1702,7 +1140,7 @@ public class EntryLogger {
         entryLoggerAllocator.stop();
     }
 
-    private static void closeFileChannel(BufferedChannelBase channel) throws 
IOException {
+    static void closeFileChannel(BufferedChannelBase channel) throws 
IOException {
         if (null == channel) {
             return;
         }
@@ -1713,7 +1151,7 @@ public class EntryLogger {
         }
     }
 
-    private static void forceCloseFileChannel(BufferedChannelBase channel) {
+    static void forceCloseFileChannel(BufferedChannelBase channel) {
         if (null == channel) {
             return;
         }
@@ -1784,4 +1222,4 @@ public class EntryLogger {
             return leastUnflushedLogId;
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
new file mode 100644
index 0000000..d33d7c4
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
@@ -0,0 +1,215 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static 
org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * An allocator pre-allocates entry log files.
+ */
+@Slf4j
+class EntryLoggerAllocator {
+
+    private long preallocatedLogId;
+    private Future<BufferedLogChannel> preallocation = null;
+    private ExecutorService allocatorExecutor;
+    private final ServerConfiguration conf;
+    private final LedgerDirsManager ledgerDirsManager;
+    private final Object createEntryLogLock = new Object();
+    private final Object createCompactionLogLock = new Object();
+    private final EntryLogger.RecentEntryLogsStatus 
recentlyCreatedEntryLogsStatus;
+    private final boolean entryLogPreAllocationEnabled;
+    final ByteBuf logfileHeader = 
Unpooled.buffer(EntryLogger.LOGFILE_HEADER_SIZE);
+
+    EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager 
ledgerDirsManager,
+            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, 
long logId) {
+        this.conf = conf;
+        this.ledgerDirsManager = ledgerDirsManager;
+        this.preallocatedLogId = logId;
+        this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+        this.entryLogPreAllocationEnabled = 
conf.isEntryLogFilePreAllocationEnabled();
+        this.allocatorExecutor = Executors.newSingleThreadExecutor();
+
+        // Initialize the entry log header buffer. This cannot be a static 
object
+        // since in our unit tests, we run multiple Bookies and thus 
EntryLoggers
+        // within the same JVM. All of these Bookie instances access this 
header
+        // so there can be race conditions when entry logs are rolled over and
+        // this header buffer is cleared before writing it into the new 
logChannel.
+        logfileHeader.writeBytes("BKLO".getBytes(UTF_8));
+        logfileHeader.writeInt(EntryLogger.HEADER_CURRENT_VERSION);
+        logfileHeader.writerIndex(EntryLogger.LOGFILE_HEADER_SIZE);
+
+    }
+
+    synchronized long getPreallocatedLogId() {
+        return preallocatedLogId;
+    }
+
+    BufferedLogChannel createNewLog(File dirForNextEntryLog) throws 
IOException {
+        synchronized (createEntryLogLock) {
+            BufferedLogChannel bc;
+            if (!entryLogPreAllocationEnabled){
+                // create a new log directly
+                bc = allocateNewLog(dirForNextEntryLog);
+                return bc;
+            } else {
+                // allocate directly to response request
+                if (null == preallocation){
+                    bc = allocateNewLog(dirForNextEntryLog);
+                } else {
+                    // has a preallocated entry log
+                    try {
+                        bc = preallocation.get();
+                    } catch (ExecutionException ee) {
+                        if (ee.getCause() instanceof IOException) {
+                            throw (IOException) (ee.getCause());
+                        } else {
+                            throw new IOException("Error to execute entry log 
allocation.", ee);
+                        }
+                    } catch (CancellationException ce) {
+                        throw new IOException("Task to allocate a new entry 
log is cancelled.", ce);
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        throw new IOException("Intrrupted when waiting a new 
entry log to be allocated.", ie);
+                    }
+                }
+                // preallocate a new log in background upon every call
+                preallocation = allocatorExecutor.submit(() -> 
allocateNewLog(dirForNextEntryLog));
+                return bc;
+            }
+        }
+    }
+
+    BufferedLogChannel createNewLogForCompaction(File dirForNextEntryLog) 
throws IOException {
+        synchronized (createCompactionLogLock) {
+            return allocateNewLog(dirForNextEntryLog, COMPACTING_SUFFIX);
+        }
+    }
+
+    private synchronized BufferedLogChannel allocateNewLog(File 
dirForNextEntryLog) throws IOException {
+        return allocateNewLog(dirForNextEntryLog, ".log");
+    }
+
+    /**
+     * Allocate a new log file.
+     */
+    private synchronized BufferedLogChannel allocateNewLog(File 
dirForNextEntryLog, String suffix) throws IOException {
+        List<File> ledgersDirs = ledgerDirsManager.getAllLedgerDirs();
+        String logFileName;
+        // It would better not to overwrite existing entry log files
+        File testLogFile = null;
+        do {
+            if (preallocatedLogId >= Integer.MAX_VALUE) {
+                preallocatedLogId = 0;
+            } else {
+                ++preallocatedLogId;
+            }
+            logFileName = Long.toHexString(preallocatedLogId) + suffix;
+            for (File dir : ledgersDirs) {
+                testLogFile = new File(dir, logFileName);
+                if (testLogFile.exists()) {
+                    log.warn("Found existed entry log " + testLogFile
+                           + " when trying to create it as a new log.");
+                    testLogFile = null;
+                    break;
+                }
+            }
+        } while (testLogFile == null);
+
+        File newLogFile = new File(dirForNextEntryLog, logFileName);
+        FileChannel channel = new RandomAccessFile(newLogFile, 
"rw").getChannel();
+
+        BufferedLogChannel logChannel = new BufferedLogChannel(channel, 
conf.getWriteBufferBytes(),
+                conf.getReadBufferBytes(), preallocatedLogId, newLogFile, 
conf.getFlushIntervalInBytes());
+        logfileHeader.readerIndex(0);
+        logChannel.write(logfileHeader);
+
+        for (File f : ledgersDirs) {
+            setLastLogId(f, preallocatedLogId);
+        }
+
+        if (suffix.equals(EntryLogger.LOG_FILE_SUFFIX)) {
+            recentlyCreatedEntryLogsStatus.createdEntryLog(preallocatedLogId);
+        }
+
+        log.info("Created new entry log file {} for logId {}.", newLogFile, 
preallocatedLogId);
+        return logChannel;
+    }
+
+    /**
+     * writes the given id to the "lastId" file in the given directory.
+     */
+    private void setLastLogId(File dir, long logId) throws IOException {
+        FileOutputStream fos;
+        fos = new FileOutputStream(new File(dir, "lastId"));
+        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fos, 
UTF_8));
+        try {
+            bw.write(Long.toHexString(logId) + "\n");
+            bw.flush();
+        } catch (IOException e) {
+            log.warn("Failed write lastId file");
+        } finally {
+            try {
+                bw.close();
+            } catch (IOException e) {
+                log.error("Could not close lastId file in {}", dir.getPath());
+            }
+        }
+    }
+
+    /**
+     * Stop the allocator.
+     */
+    void stop() {
+        // wait until the preallocation finished.
+        allocatorExecutor.shutdown();
+        log.info("Stopped entry logger preallocator.");
+    }
+
+    /**
+     * get the preallocation for tests.
+     */
+    Future<BufferedLogChannel> getPreallocationFuture(){
+        return preallocation;
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index 4257ccc..4c4514a 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -26,8 +26,6 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.IntStream;
 
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
-import 
org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.util.DiskChecker;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 99c1ad4..ac64335 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -49,9 +49,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManager;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
-import 
org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -855,7 +852,7 @@ public class EntryLogTest {
         conf.setEntryLogPerLedgerEnabled(false);
         EntryLogger newEntryLogger = new EntryLogger(conf, ledgerDirsManager);
         EntryLogManager newEntryLogManager = 
newEntryLogger.getEntryLogManager();
-        Assert.assertEquals("EntryLogManager class type", 
EntryLogger.EntryLogManagerForSingleEntryLog.class,
+        Assert.assertEquals("EntryLogManager class type", 
EntryLogManagerForSingleEntryLog.class,
                 newEntryLogManager.getClass());
 
         ByteBuf buf = newEntryLogger.readEntry(ledgerId, 0L, entry0Position);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
index 6cbdcde..604099c 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
@@ -43,7 +43,6 @@ import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
 import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
 import org.apache.bookkeeper.bookie.Journal.LastLogMark;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index 9642c18..f3ef108 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -36,7 +36,6 @@ import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
-import 
org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.junit.After;

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to