This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3392bee  BOOKKEEPER-1040: Use separate log for compaction and add 
transaction support
3392bee is described below

commit 3392beee5c70abe36d36604723e14d97b7764be9
Author: Yiming Zang <yz...@twitter.com>
AuthorDate: Wed Nov 8 19:27:20 2017 +0800

    BOOKKEEPER-1040: Use separate log for compaction and add transaction support
    
    Problem:
    Compaction might keep generating duplicated data which would cause disk 
full.
    This is because we don't have transactional operation for compaction. So if 
compaction
    keeps failing in the middle, bookie would end up with a lot of garbage data.
    
    Change:
    1. Introduce abstract class AbstractLogCompactor with an interface 
compact().
    2. Move the existing compaction logic to a separate class called 
EntryLogCompactor.
    3. Introduce transactional compaction, we can recover an incomplete 
compaction or rollback a compaction failure.
    4. Add an configuration to enable transactional compaction
    
    Potential Risk:
    1. No risk if we keep using the default compactor.
    2. If we choose to enable transactional compaction with separate log file, 
we need to be careful about it, since we use a separate log for compaction, if 
we have a lot of old small ledgers in the bookie, we will end up with a lot of 
small entry log files. And this will potentially cause bookkeeper "Too many 
open files" because GC will scan all the entry log files and keep them open.
    
    Author: Yiming Zang <yz...@twitter.com>
    
    Reviewers: Enrico Olivelli <eolive...@gmail.com>, Jia Zhai <None>, Sijie 
Guo <si...@apache.org>
    
    This closes #704 from yzang/yzang/BOOKKEEPER-1040
---
 .../bookkeeper/bookie/AbstractLogCompactor.java    |  71 ++++
 .../apache/bookkeeper/bookie/EntryLocation.java    |  12 +
 .../bookkeeper/bookie/EntryLogCompactor.java       | 123 +++++++
 .../org/apache/bookkeeper/bookie/EntryLogger.java  | 267 ++++++++++----
 .../bookkeeper/bookie/GarbageCollectorThread.java  | 192 ++--------
 .../bookie/TransactionalEntryLogCompactor.java     | 388 +++++++++++++++++++++
 .../bookkeeper/conf/ServerConfiguration.java       |  20 ++
 .../bookie/BookieStorageThresholdTest.java         |  32 +-
 .../apache/bookkeeper/bookie/CompactionTest.java   | 308 +++++++++++++---
 .../bookkeeper/test/BookKeeperClusterTestCase.java |   9 +
 10 files changed, 1140 insertions(+), 282 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
new file mode 100644
index 0000000..8f190a3
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
@@ -0,0 +1,71 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * Abstract entry log compactor used for compaction.
+ */
+public abstract class AbstractLogCompactor {
+
+    protected final ServerConfiguration conf;
+    protected final Throttler throttler;
+    protected final GarbageCollectorThread gcThread;
+
+    public AbstractLogCompactor(GarbageCollectorThread gcThread) {
+        this.gcThread = gcThread;
+        this.conf = gcThread.conf;
+        this.throttler = new Throttler(conf);
+    }
+
+    /**
+     * Compact entry log file.
+     * @param entryLogMeta log metadata for the entry log to be compacted
+     * @return true for succeed
+     */
+    public abstract boolean compact(EntryLogMetadata entryLogMeta);
+
+    /**
+     * Do nothing by default. Intended for subclass to override this method.
+     */
+    public void cleanUpAndRecover() {}
+
+    static class Throttler {
+        private final RateLimiter rateLimiter;
+        private final boolean isThrottleByBytes;
+
+        Throttler(ServerConfiguration conf) {
+            this.isThrottleByBytes  = conf.getIsThrottleByBytes();
+            this.rateLimiter = RateLimiter.create(this.isThrottleByBytes
+                ? conf.getCompactionRateByBytes() : 
conf.getCompactionRateByEntries());
+        }
+
+        // acquire. if bybytes: bytes of this entry; if byentries: 1.
+        void acquire(int permits) {
+            rateLimiter.acquire(this.isThrottleByBytes ? permits : 1);
+        }
+    }
+
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java
index 87005ce..be5eb7f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java
@@ -34,4 +34,16 @@ public class EntryLocation {
         this.entry = entry;
         this.location = location;
     }
+
+    public long getLedger() {
+        return ledger;
+    }
+
+    public long getEntry() {
+        return entry;
+    }
+
+    public long getLocation() {
+        return location;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
new file mode 100644
index 0000000..c31b989
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
@@ -0,0 +1,123 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the basic entry log compactor to compact entry logs.
+ * The compaction is done by scanning the old entry log file, copy the active 
ledgers to the
+ * current entry logger and remove the old entry log when the scan is over.
+ */
+public class EntryLogCompactor extends AbstractLogCompactor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(EntryLogCompactor.class);
+
+    final CompactionScannerFactory scannerFactory = new 
CompactionScannerFactory();
+    final EntryLogger entryLogger;
+    final CompactableLedgerStorage ledgerStorage;
+    private final int maxOutstandingRequests;
+
+    public EntryLogCompactor(GarbageCollectorThread gcThread) {
+        super(gcThread);
+        this.maxOutstandingRequests = 
conf.getCompactionMaxOutstandingRequests();
+        this.entryLogger = gcThread.getEntryLogger();
+        this.ledgerStorage = gcThread.getLedgerStorage();
+    }
+
+    @Override
+    public boolean compact(EntryLogMetadata entryLogMeta) {
+        try {
+            entryLogger.scanEntryLog(entryLogMeta.getEntryLogId(),
+                scannerFactory.newScanner(entryLogMeta));
+            scannerFactory.flush();
+            LOG.info("Removing entry log {} after compaction", 
entryLogMeta.getEntryLogId());
+            gcThread.removeEntryLog(entryLogMeta.getEntryLogId());
+        } catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) {
+            LOG.warn("No writable ledger directory available, aborting 
compaction", nwlde);
+            return false;
+        } catch (IOException ioe) {
+            // if compact entry log throws IOException, we don't want to 
remove that
+            // entry log. however, if some entries from that log have been 
re-added
+            // to the entry log, and the offset updated, it's ok to flush that
+            LOG.error("Error compacting entry log. Log won't be deleted", ioe);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * A scanner wrapper to check whether a ledger is alive in an entry log 
file.
+     */
+    class CompactionScannerFactory {
+        List<EntryLocation> offsets = new ArrayList<EntryLocation>();
+
+        EntryLogger.EntryLogScanner newScanner(final EntryLogMetadata meta) {
+
+            return new EntryLogger.EntryLogScanner() {
+                @Override
+                public boolean accept(long ledgerId) {
+                    return meta.containsLedger(ledgerId);
+                }
+
+                @Override
+                public void process(final long ledgerId, long offset, 
ByteBuffer entry) throws IOException {
+                    throttler.acquire(entry.remaining());
+
+                    if (offsets.size() > maxOutstandingRequests) {
+                        flush();
+                    }
+                    entry.getLong(); // discard ledger id, we already have it
+                    long entryId = entry.getLong();
+                    entry.rewind();
+
+                    long newoffset = entryLogger.addEntry(ledgerId, entry);
+                    offsets.add(new EntryLocation(ledgerId, entryId, 
newoffset));
+
+                }
+            };
+        }
+
+        void flush() throws IOException {
+            if (offsets.isEmpty()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Skipping entry log flushing, as there are no 
offset!");
+                }
+                return;
+            }
+
+            // Before updating the index, we want to wait until all the 
compacted entries are flushed into the
+            // entryLog
+            try {
+                entryLogger.flush();
+                ledgerStorage.updateEntriesLocations(offsets);
+            } finally {
+                offsets.clear();
+            }
+        }
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 717a8ae..280806c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -22,6 +22,8 @@
 package org.apache.bookkeeper.bookie;
 
 import static com.google.common.base.Charsets.UTF_8;
+
+import static 
org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
 import static 
org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
 
 import io.netty.buffer.ByteBuf;
@@ -50,7 +52,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -79,17 +80,26 @@ public class EntryLogger {
     private static class BufferedLogChannel extends BufferedChannel {
         private final long logId;
         private final EntryLogMetadata entryLogMetadata;
+        private final File logFile;
 
-        public BufferedLogChannel(FileChannel fc, int writeCapacity,
-                                  int readCapacity, long logId) throws 
IOException {
+        public BufferedLogChannel(FileChannel fc,
+                                  int writeCapacity,
+                                  int readCapacity,
+                                  long logId,
+                                  File logFile) throws IOException {
             super(fc, writeCapacity, readCapacity);
             this.logId = logId;
             this.entryLogMetadata = new EntryLogMetadata(logId);
+            this.logFile = logFile;
         }
         public long getLogId() {
             return logId;
         }
 
+        public File getLogFile() {
+            return logFile;
+        }
+
         public void registerWrittenEntry(long ledgerId, long entrySize) {
             entryLogMetadata.addLedgerSize(ledgerId, entrySize);
         }
@@ -106,11 +116,17 @@ public class EntryLogger {
     private volatile long leastUnflushedLogId;
 
     /**
+     * locks for compaction log
+     */
+    private final Object compactionLogLock = new Object();
+
+    /**
      * The maximum size of a entry logger file.
      */
     final long logSizeLimit;
     private List<BufferedLogChannel> logChannelsToFlush;
     private volatile BufferedLogChannel logChannel;
+    private volatile BufferedLogChannel compactionLogChannel;
     private final EntryLoggerAllocator entryLoggerAllocator;
     private final boolean entryLogPreAllocationEnabled;
     private final CopyOnWriteArrayList<EntryLogListener> listeners = new 
CopyOnWriteArrayList<EntryLogListener>();
@@ -370,6 +386,18 @@ public class EntryLogger {
         return logChannel.getLogId();
     }
 
+    /**
+     * Get the current log file for compaction
+     */
+    File getCurCompactionLogFile() {
+        synchronized (compactionLogLock) {
+            if (compactionLogChannel == null) {
+                return null;
+            }
+            return compactionLogChannel.getLogFile();
+        }
+    }
+
     protected void initialize() throws IOException {
         // Register listener for disk full notifications.
         ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
@@ -461,6 +489,7 @@ public class EntryLogger {
         } else {
             logChannel = entryLoggerAllocator.createNewLog();
         }
+        currentDir = logChannel.getLogFile().getParentFile();
     }
 
     /**
@@ -517,50 +546,59 @@ public class EntryLogger {
      */
     class EntryLoggerAllocator {
 
-        long preallocatedLogId;
-        Future<BufferedLogChannel> preallocation = null;
-        ExecutorService allocatorExecutor;
+        private long preallocatedLogId;
+        private Future<BufferedLogChannel> preallocation = null;
+        private ExecutorService allocatorExecutor;
+        private final Object createEntryLogLock = new Object();
+        private final Object createCompactionLogLock = new Object();
 
         EntryLoggerAllocator(long logId) {
             preallocatedLogId = logId;
             allocatorExecutor = Executors.newSingleThreadExecutor();
         }
 
-        synchronized BufferedLogChannel createNewLog() throws IOException {
-            BufferedLogChannel bc;
-            if (!entryLogPreAllocationEnabled || null == preallocation) {
-                // initialization time to create a new log
-                bc = allocateNewLog();
-            } else {
-                // has a preallocated entry log
-                try {
-                    bc = preallocation.get();
-                } catch (ExecutionException ee) {
-                    if (ee.getCause() instanceof IOException) {
-                        throw (IOException) (ee.getCause());
-                    } else {
-                        throw new IOException("Error to execute entry log 
allocation.", ee);
+        BufferedLogChannel createNewLog() throws IOException {
+            synchronized (createEntryLogLock) {
+                BufferedLogChannel bc;
+                if (!entryLogPreAllocationEnabled || null == preallocation) {
+                    // initialization time to create a new log
+                    bc = allocateNewLog();
+                    return bc;
+                } else {
+                    // has a preallocated entry log
+                    try {
+                        bc = preallocation.get();
+                    } catch (ExecutionException ee) {
+                        if (ee.getCause() instanceof IOException) {
+                            throw (IOException) (ee.getCause());
+                        } else {
+                            throw new IOException("Error to execute entry log 
allocation.", ee);
+                        }
+                    } catch (CancellationException ce) {
+                        throw new IOException("Task to allocate a new entry 
log is cancelled.", ce);
+                    } catch (InterruptedException ie) {
+                        throw new IOException("Intrrupted when waiting a new 
entry log to be allocated.", ie);
                     }
-                } catch (CancellationException ce) {
-                    throw new IOException("Task to allocate a new entry log is 
cancelled.", ce);
-                } catch (InterruptedException ie) {
-                    throw new IOException("Intrrupted when waiting a new entry 
log to be allocated.", ie);
                 }
-                preallocation = allocatorExecutor.submit(new 
Callable<BufferedLogChannel>() {
-                    @Override
-                    public BufferedLogChannel call() throws IOException {
-                        return allocateNewLog();
-                    }
-                });
+                preallocation = allocatorExecutor.submit(() -> 
allocateNewLog());
+                return bc;
             }
-            LOG.info("Created new entry logger {}.", bc.getLogId());
-            return bc;
+        }
+
+        BufferedLogChannel createNewLogForCompaction() throws IOException {
+            synchronized (createCompactionLogLock) {
+                return allocateNewLog(COMPACTING_SUFFIX);
+            }
+        }
+
+        private BufferedLogChannel allocateNewLog() throws IOException {
+            return allocateNewLog(".log");
         }
 
         /**
          * Allocate a new log file.
          */
-        BufferedLogChannel allocateNewLog() throws IOException {
+        private BufferedLogChannel allocateNewLog(String suffix) throws 
IOException {
             List<File> list = 
ledgerDirsManager.getWritableLedgerDirsForNewLog();
             Collections.shuffle(list);
             // It would better not to overwrite existing entry log files
@@ -571,10 +609,9 @@ public class EntryLogger {
                 } else {
                     ++preallocatedLogId;
                 }
-                String logFileName = Long.toHexString(preallocatedLogId) + 
".log";
+                String logFileName = Long.toHexString(preallocatedLogId) + 
suffix;
                 for (File dir : list) {
                     newLogFile = new File(dir, logFileName);
-                    currentDir = dir;
                     if (newLogFile.exists()) {
                         LOG.warn("Found existed entry log " + newLogFile
                                + " when trying to create it as a new log.");
@@ -586,13 +623,13 @@ public class EntryLogger {
 
             FileChannel channel = new RandomAccessFile(newLogFile, 
"rw").getChannel();
             BufferedLogChannel logChannel = new BufferedLogChannel(channel,
-                    conf.getWriteBufferBytes(), conf.getReadBufferBytes(), 
preallocatedLogId);
+                    conf.getWriteBufferBytes(), conf.getReadBufferBytes(), 
preallocatedLogId, newLogFile);
             logChannel.write((ByteBuffer) logfileHeader.clear());
 
             for (File f : list) {
                 setLastLogId(f, preallocatedLogId);
             }
-            LOG.info("Preallocated entry logger {}.", preallocatedLogId);
+            LOG.info("Created new entry log file {} for logId {}.", 
newLogFile, preallocatedLogId);
             return logChannel;
         }
 
@@ -665,12 +702,8 @@ public class EntryLogger {
         List<Long> logs = new ArrayList<Long>();
         if (logFiles != null) {
             for (File lf : logFiles) {
-                String idString = lf.getName().split("\\.")[0];
-                try {
-                    long lid = Long.parseLong(idString, 16);
-                    logs.add(lid);
-                } catch (NumberFormatException nfe) {
-                }
+                long logId = fileName2LogId(lf.getName());
+                logs.add(logId);
             }
         }
         // no log file found in this directory
@@ -776,35 +809,10 @@ public class EntryLogger {
         return addEntry(ledger, entry, true);
     }
 
-    private static final class RecyclableByteBuffer {
-        private static final Recycler<RecyclableByteBuffer> RECYCLER = new  
Recycler<RecyclableByteBuffer>() {
-            @Override
-            protected RecyclableByteBuffer 
newObject(Handle<RecyclableByteBuffer> handle) {
-                return new RecyclableByteBuffer(handle);
-            }
-        };
-
-        private final ByteBuffer buffer;
-        private final Handle<RecyclableByteBuffer> handle;
-        public RecyclableByteBuffer(Handle<RecyclableByteBuffer> handle) {
-            this.buffer = ByteBuffer.allocate(4);
-            this.handle = handle;
-        }
-
-        public static RecyclableByteBuffer get() {
-            return RECYCLER.get();
-        }
-
-        public void recycle() {
-            buffer.rewind();
-            handle.recycle(this);
-        }
-    }
-
     synchronized long addEntry(long ledger, ByteBuffer entry, boolean rollLog) 
throws IOException {
         int entrySize = entry.remaining() + 4;
         boolean reachEntryLogLimit =
-                rollLog ? reachEntryLogLimit(entrySize) : 
readEntryLogHardLimit(entrySize);
+            rollLog ? reachEntryLogLimit(entrySize) : 
readEntryLogHardLimit(entrySize);
         // Create new log if logSizeLimit reached or current disk is full
         boolean createNewLog = shouldCreateNewEntryLog.get();
         if (createNewLog || reachEntryLogLimit) {
@@ -834,6 +842,95 @@ public class EntryLogger {
         return (logChannel.getLogId() << 32L) | pos;
     }
 
+    long addEntryForCompaction(long ledgerId, ByteBuffer entry) throws 
IOException {
+        synchronized (compactionLogLock) {
+            int entrySize = entry.remaining() + 4;
+            if (compactionLogChannel == null) {
+                createNewCompactionLog();
+            }
+            ByteBuffer buff = ByteBuffer.allocate(4);
+            buff.putInt(entry.remaining());
+            buff.flip();
+            compactionLogChannel.write(buff);
+            long pos = compactionLogChannel.position();
+            compactionLogChannel.write(entry);
+            compactionLogChannel.registerWrittenEntry(ledgerId, entrySize);
+            return (compactionLogChannel.getLogId() << 32L) | pos;
+        }
+    }
+
+    void flushCompactionLog() throws IOException {
+        synchronized (compactionLogLock) {
+            if (compactionLogChannel != null) {
+                compactionLogChannel.flush(true);
+                LOG.info("Flushed compaction log file {} with logId.",
+                    compactionLogChannel.getLogFile(),
+                    compactionLogChannel.getLogId());
+                // since this channel is only used for writing, after flushing 
the channel,
+                // we had to close the underlying file channel. Otherwise, we 
might end up
+                // leaking fds which cause the disk spaces could not be 
reclaimed.
+                closeFileChannel(compactionLogChannel);
+            } else {
+                throw new IOException("Failed to flush compaction log which 
has already been removed.");
+            }
+        }
+    }
+
+    void createNewCompactionLog() throws IOException {
+        synchronized (compactionLogLock) {
+            if (compactionLogChannel == null) {
+                compactionLogChannel = 
entryLoggerAllocator.createNewLogForCompaction();
+            }
+        }
+    }
+
+    /**
+     * Remove the current compaction log, usually invoked when compaction 
failed and
+     * we need to do some clean up to remove the compaction log file.
+     */
+    void removeCurCompactionLog() {
+        synchronized (compactionLogLock) {
+            if (compactionLogChannel != null) {
+                if (!compactionLogChannel.getLogFile().delete()) {
+                    LOG.warn("Could not delete compaction log file {}", 
compactionLogChannel.getLogFile());
+                }
+                try {
+                    closeFileChannel(compactionLogChannel);
+                } catch (IOException e) {
+                    LOG.error("Failed to close file channel for compaction log 
{}", compactionLogChannel.getLogId());
+                }
+                compactionLogChannel = null;
+            }
+        }
+    }
+
+
+    private static final class RecyclableByteBuffer {
+        private static final Recycler<RecyclableByteBuffer> RECYCLER = new  
Recycler<RecyclableByteBuffer>() {
+            @Override
+            protected RecyclableByteBuffer 
newObject(Handle<RecyclableByteBuffer> handle) {
+                return new RecyclableByteBuffer(handle);
+            }
+        };
+
+        private final ByteBuffer buffer;
+        private final Handle<RecyclableByteBuffer> handle;
+        public RecyclableByteBuffer(Handle<RecyclableByteBuffer> handle) {
+            this.buffer = ByteBuffer.allocate(4);
+            this.handle = handle;
+        }
+
+        public static RecyclableByteBuffer get() {
+            return RECYCLER.get();
+        }
+
+        public void recycle() {
+            buffer.rewind();
+            handle.recycle(this);
+        }
+    }
+
+
     private void incrementBytesWrittenAndMaybeFlush(long bytesWritten) throws 
IOException {
         if (!doRegularFlushes) {
             return;
@@ -1185,6 +1282,10 @@ public class EntryLogger {
             logid2FileChannel.clear();
             // close current writing log file
             closeFileChannel(logChannel);
+            synchronized (compactionLogLock) {
+                closeFileChannel(compactionLogChannel);
+                compactionLogChannel = null;
+            }
         } catch (IOException ie) {
             // we have no idea how to avoid io exception during shutting down, 
so just ignore it
             LOG.error("Error flush entry log during shutting down, which may 
cause entry log corrupted.", ie);
@@ -1193,6 +1294,9 @@ public class EntryLogger {
                 IOUtils.close(LOG, fc);
             }
             forceCloseFileChannel(logChannel);
+            synchronized (compactionLogLock) {
+                forceCloseFileChannel(compactionLogChannel);
+            }
         }
         // shutdown the pre-allocation thread
         entryLoggerAllocator.stop();
@@ -1218,4 +1322,29 @@ public class EntryLogger {
         }
     }
 
+    protected LedgerDirsManager getLedgerDirsManager() {
+        return ledgerDirsManager;
+    }
+
+    /**
+     * Convert log filename (hex format with suffix) to logId in long.
+     */
+    static long fileName2LogId(String fileName) {
+        if (fileName != null && fileName.contains(".")) {
+            fileName = fileName.split("\\.")[0];
+        }
+        try {
+            return Long.parseLong(fileName, 16);
+        } catch (Exception nfe) {
+            LOG.error("Invalid log file name {} found when trying to convert 
to logId.", fileName, nfe);
+        }
+        return INVALID_LID;
+    }
+
+    /**
+     * Convert log Id to hex string
+     */
+    static String logId2HexString(long logId) {
+        return Long.toHexString(logId);
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 5dcd4a0..849d5bb 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -22,10 +22,8 @@
 package org.apache.bookkeeper.bookie;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -37,7 +35,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+
 import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -76,15 +74,9 @@ public class GarbageCollectorThread extends SafeRunnable {
 
     final boolean isForceGCAllowWhenNoSpace;
 
-    final boolean isThrottleByBytes;
-    final int maxOutstandingRequests;
-    final int compactionRateByEntries;
-    final int compactionRateByBytes;
-    final CompactionScannerFactory scannerFactory;
-
     // Entry Logger Handle
     final EntryLogger entryLogger;
-
+    final AbstractLogCompactor compactor;
     final CompactableLedgerStorage ledgerStorage;
 
     // flag to ensure gc thread will not be interrupted during compaction
@@ -106,82 +98,7 @@ public class GarbageCollectorThread extends SafeRunnable {
     final GarbageCollector garbageCollector;
     final GarbageCleaner garbageCleaner;
 
-    private static class Throttler {
-        final RateLimiter rateLimiter;
-        final boolean isThrottleByBytes;
-        final int compactionRateByBytes;
-        final int compactionRateByEntries;
-
-        Throttler(boolean isThrottleByBytes,
-                  int compactionRateByBytes,
-                  int compactionRateByEntries) {
-            this.isThrottleByBytes  = isThrottleByBytes;
-            this.compactionRateByBytes = compactionRateByBytes;
-            this.compactionRateByEntries = compactionRateByEntries;
-            this.rateLimiter = RateLimiter.create(this.isThrottleByBytes
-                    ? this.compactionRateByBytes : 
this.compactionRateByEntries);
-        }
-
-        // acquire. if bybytes: bytes of this entry; if byentries: 1.
-        void acquire(int permits) {
-            rateLimiter.acquire(this.isThrottleByBytes ? permits : 1);
-        }
-    }
-
-    /**
-     * A scanner wrapper to check whether a ledger is alive in an entry log 
file.
-     */
-    class CompactionScannerFactory {
-        List<EntryLocation> offsets = new ArrayList<EntryLocation>();
-
-        EntryLogScanner newScanner(final EntryLogMetadata meta) {
-            final Throttler throttler = new Throttler (isThrottleByBytes,
-                                                       compactionRateByBytes,
-                                                       
compactionRateByEntries);
-
-            return new EntryLogScanner() {
-                @Override
-                public boolean accept(long ledgerId) {
-                    return meta.containsLedger(ledgerId);
-                }
-
-                @Override
-                public void process(final long ledgerId, long offset, 
ByteBuffer entry) throws IOException {
-                    throttler.acquire(entry.remaining());
-
-                    if (offsets.size() > maxOutstandingRequests) {
-                        flush();
-                    }
-                    entry.getLong(); // discard ledger id, we already have it
-                    long entryId = entry.getLong();
-                    entry.rewind();
-
-                    long newoffset = entryLogger.addEntry(ledgerId, entry);
-                    offsets.add(new EntryLocation(ledgerId, entryId, 
newoffset));
-
-                }
-            };
-        }
-
-        void flush() throws IOException {
-            if (offsets.isEmpty()) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Skipping entry log flushing, as there are no 
offset!");
-                }
-                return;
-            }
-
-            // Before updating the index, we want to wait until all the 
compacted entries are flushed into the
-            // entryLog
-            try {
-                entryLogger.flush();
-
-                ledgerStorage.updateEntriesLocations(offsets);
-            } finally {
-                offsets.clear();
-            }
-        }
-    }
+    final ServerConfiguration conf;
 
 
     /**
@@ -198,40 +115,34 @@ public class GarbageCollectorThread extends SafeRunnable {
         gcExecutor = Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("GarbageCollectorThread-%d").build()
         );
-
+        this.conf = conf;
         this.entryLogger = ledgerStorage.getEntryLogger();
         this.ledgerStorage = ledgerStorage;
-
         this.gcWaitTime = conf.getGcWaitTime();
-        this.isThrottleByBytes = conf.getIsThrottleByBytes();
-        this.maxOutstandingRequests = 
conf.getCompactionMaxOutstandingRequests();
-        this.compactionRateByEntries  = conf.getCompactionRateByEntries();
-        this.compactionRateByBytes = conf.getCompactionRateByBytes();
-        this.scannerFactory = new CompactionScannerFactory();
-
-        this.garbageCleaner = new GarbageCollector.GarbageCleaner() {
-            @Override
-            public void clean(long ledgerId) {
-                try {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("delete ledger : " + ledgerId);
-                    }
 
-                    ledgerStorage.deleteLedger(ledgerId);
-                } catch (IOException e) {
-                    LOG.error("Exception when deleting the ledger index file 
on the Bookie: ", e);
+        this.garbageCleaner = ledgerId -> {
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("delete ledger : " + ledgerId);
                 }
+                ledgerStorage.deleteLedger(ledgerId);
+            } catch (IOException e) {
+                LOG.error("Exception when deleting the ledger index file on 
the Bookie: ", e);
             }
         };
 
         this.garbageCollector = new 
ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage, conf);
-
         // compaction parameters
         minorCompactionThreshold = conf.getMinorCompactionThreshold();
         minorCompactionInterval = conf.getMinorCompactionInterval() * SECOND;
         majorCompactionThreshold = conf.getMajorCompactionThreshold();
         majorCompactionInterval = conf.getMajorCompactionInterval() * SECOND;
         isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace();
+        if (conf.getUseTransactionalCompaction()) {
+            this.compactor = new TransactionalEntryLogCompactor(this);
+        } else {
+            this.compactor = new EntryLogCompactor(this);
+        }
 
         if (minorCompactionInterval > 0 && minorCompactionThreshold > 0) {
             if (minorCompactionThreshold > 1.0f) {
@@ -360,6 +271,8 @@ public class GarbageCollectorThread extends SafeRunnable {
         if (force) {
             LOG.info("Garbage collector thread forced to perform GC before 
expiry of wait time.");
         }
+        // Recover and clean up previous state if using transactional 
compaction
+        compactor.cleanUpAndRecover();
 
         // Extract all of the ledger ID's that comprise all of the entry logs
         // (except for the current new one which is still being written to).
@@ -444,51 +357,21 @@ public class GarbageCollectorThread extends SafeRunnable {
      */
     @VisibleForTesting
     void doCompactEntryLogs(double threshold) {
-        LOG.info("Do compaction to compact those files lower than " + 
threshold);
-        // sort the ledger meta by occupied unused space
-        Comparator<EntryLogMetadata> sizeComparator = new 
Comparator<EntryLogMetadata>() {
-            @Override
-            public int compare(EntryLogMetadata m1, EntryLogMetadata m2) {
-                long unusedSize1 = m1.getTotalSize() - m1.getRemainingSize();
-                long unusedSize2 = m2.getTotalSize() - m2.getRemainingSize();
-                if (unusedSize1 > unusedSize2) {
-                    return -1;
-                } else if (unusedSize1 < unusedSize2) {
-                    return 1;
-                } else {
-                    return 0;
-                }
-            }
-        };
+        LOG.info("Do compaction to compact those files lower than {}", 
threshold);
+
+        // sort the ledger meta by usage in ascending order.
         List<EntryLogMetadata> logsToCompact = new 
ArrayList<EntryLogMetadata>();
         logsToCompact.addAll(entryLogMetaMap.values());
-        Collections.sort(logsToCompact, sizeComparator);
+        logsToCompact.sort(Comparator.comparing(EntryLogMetadata::getUsage));
 
         for (EntryLogMetadata meta : logsToCompact) {
             if (meta.getUsage() >= threshold) {
                 break;
             }
-
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Compacting entry log {} below threshold {}", 
meta.getEntryLogId(), threshold);
             }
-            try {
-                compactEntryLog(scannerFactory, meta);
-                scannerFactory.flush();
-
-                LOG.info("Removing entry log {} after compaction", 
meta.getEntryLogId());
-                removeEntryLog(meta.getEntryLogId());
-
-            } catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) {
-                LOG.warn("No writable ledger directory available, aborting 
compaction", nwlde);
-                break;
-            } catch (IOException ioe) {
-                // if compact entry log throws IOException, we don't want to 
remove that
-                // entry log. however, if some entries from that log have been 
readded
-                // to the entry log, and the offset updated, it's ok to flush 
that
-                LOG.error("Error compacting entry log. Log won't be deleted", 
ioe);
-            }
-
+            compactEntryLog(meta);
             if (!running) { // if gc thread is not running, stop compaction
                 return;
             }
@@ -519,9 +402,10 @@ public class GarbageCollectorThread extends SafeRunnable {
      * @param entryLogId
      *          Entry Log File Id
      */
-    private void removeEntryLog(long entryLogId) {
+    protected void removeEntryLog(long entryLogId) {
         // remove entry log file successfully
         if (entryLogger.removeEntryLog(entryLogId)) {
+            LOG.info("Removing entry log metadata for {}", entryLogId);
             entryLogMetaMap.remove(entryLogId);
         }
     }
@@ -529,11 +413,9 @@ public class GarbageCollectorThread extends SafeRunnable {
     /**
      * Compact an entry log.
      *
-     * @param scannerFactory
      * @param entryLogMeta
      */
-    protected void compactEntryLog(CompactionScannerFactory scannerFactory,
-                                   EntryLogMetadata entryLogMeta) throws 
IOException {
+    protected void compactEntryLog(EntryLogMetadata entryLogMeta) {
         // Similar with Sync Thread
         // try to mark compacting flag to make sure it would not be interrupted
         // by shutdown during compaction. otherwise it will receive
@@ -544,16 +426,10 @@ public class GarbageCollectorThread extends SafeRunnable {
             // indicates that compaction is in progress for this EntryLogId.
             return;
         }
-
-        LOG.info("Compacting entry log : {} - Usage: {} %", 
entryLogMeta.getEntryLogId(), entryLogMeta.getUsage());
-
-        try {
-            entryLogger.scanEntryLog(entryLogMeta.getEntryLogId(),
-                                     scannerFactory.newScanner(entryLogMeta));
-        } finally {
-            // clear compacting flag
-            compacting.set(false);
-        }
+        // Do the actual compaction
+        compactor.compact(entryLogMeta);
+        // Mark compaction done
+        compacting.set(false);
     }
 
     /**
@@ -603,4 +479,12 @@ public class GarbageCollectorThread extends SafeRunnable {
         }
         return entryLogMetaMap;
     }
+
+    EntryLogger getEntryLogger() {
+        return entryLogger;
+    }
+
+    CompactableLedgerStorage getLedgerStorage() {
+        return ledgerStorage;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
new file mode 100644
index 0000000..b263453
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
@@ -0,0 +1,388 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.util.HardLink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used for compaction. Compaction is done in several 
transactional phases.
+ * Phase 1: Scan old entry log and compact entries to a new .compacting log 
file.
+ * Phase 2: Flush .compacting log to disk and it becomes .compacted log file 
when this completes.
+ * Phase 3: Flush ledger cache and .compacted file becomes .log file when this 
completes. Remove old
+ * entry log file afterwards.
+ */
+public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TransactionalEntryLogCompactor.class);
+
+    final EntryLogger entryLogger;
+    final CompactableLedgerStorage ledgerStorage;
+    final List<EntryLocation> offsets = new ArrayList<>();
+
+    // compaction log file suffix
+    static final String COMPACTING_SUFFIX = ".log.compacting";
+    // flushed compaction log file suffix
+    static final String COMPACTED_SUFFIX = ".compacted";
+
+    public TransactionalEntryLogCompactor(GarbageCollectorThread gcThread) {
+        super(gcThread);
+        this.entryLogger = gcThread.getEntryLogger();
+        this.ledgerStorage = gcThread.getLedgerStorage();
+    }
+
+    /**
+     * Delete all previously incomplete compacting logs and recover the index 
for compacted logs.
+     */
+    @Override
+    public void cleanUpAndRecover() {
+        // clean up compacting logs and recover index for already compacted 
logs
+        List<File> ledgerDirs = 
entryLogger.getLedgerDirsManager().getAllLedgerDirs();
+        for (File dir : ledgerDirs) {
+            File[] compactingPhaseFiles = dir.listFiles(file -> 
file.getName().endsWith(COMPACTING_SUFFIX));
+            if (compactingPhaseFiles != null) {
+                for (File file : compactingPhaseFiles) {
+                    if (file.delete()) {
+                        LOG.info("Deleted failed compaction file {}", file);
+                    }
+                }
+            }
+            File[] compactedPhaseFiles = dir.listFiles(file -> 
file.getName().endsWith(COMPACTED_SUFFIX));
+            if (compactedPhaseFiles != null) {
+                for (File compactedFile : compactedPhaseFiles) {
+                    LOG.info("Found compacted log file {} has partially 
flushed index, recovering index.", compactedFile);
+                    CompactionPhase updateIndex = new 
UpdateIndexPhase(compactedFile, true);
+                    updateIndex.run();
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean compact(EntryLogMetadata metadata) {
+        if (metadata != null) {
+            LOG.info("Compacting entry log {} with usage {}.",
+                new Object[]{metadata.getEntryLogId(), metadata.getUsage()});
+            CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata);
+            if (!scanEntryLog.run()) {
+                LOG.info("Compaction for entry log {} end in 
ScanEntryLogPhase.", metadata.getEntryLogId());
+                return false;
+            }
+            File compactionLogFile = entryLogger.getCurCompactionLogFile();
+            CompactionPhase flushCompactionLog = new 
FlushCompactionLogPhase(metadata.getEntryLogId());
+            if (!flushCompactionLog.run()) {
+                LOG.info("Compaction for entry log {} end in 
FlushCompactionLogPhase.", metadata.getEntryLogId());
+                return false;
+            }
+            File compactedLogFile = getCompactedLogFile(compactionLogFile, 
metadata.getEntryLogId());
+            CompactionPhase updateIndex = new 
UpdateIndexPhase(compactedLogFile);
+            if (!updateIndex.run()) {
+                LOG.info("Compaction for entry log {} end in 
UpdateIndexPhase.", metadata.getEntryLogId());
+                return false;
+            }
+            LOG.info("Compacted entry log : {}.", metadata.getEntryLogId());
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * An abstract class that would be extended to be the actual transactional 
phases for compaction
+     */
+    abstract static class CompactionPhase {
+        private String phaseName = "";
+
+        CompactionPhase(String phaseName) {
+            this.phaseName = phaseName;
+        }
+
+        boolean run() {
+            try {
+                start();
+                return complete();
+            } catch (IOException e) {
+                LOG.error("Encounter exception in compaction phase {}. Abort 
current compaction.", phaseName, e);
+                abort();
+            }
+            return false;
+        }
+
+        abstract void start() throws IOException;
+
+        abstract boolean complete() throws IOException;
+
+        abstract void abort();
+
+    }
+
+    /**
+     * Assume we're compacting entry log 1 to entry log 3.
+     * The first phase is to scan entries in 1.log and copy them to compaction 
log file "3.log.compacting".
+     * We'll try to allocate a new compaction log before scanning to make sure 
we have a log file to write.
+     * If after scanning, there's no data written, it means there's no valid 
entries to be compacted,
+     * so we can remove 1.log directly, clear the offsets and end the 
compaction.
+     * Otherwise, we should move on to the next phase.
+     * <p>
+     * If anything failed in this phase, we should delete the compaction log 
and clean the offsets.
+     */
+    class ScanEntryLogPhase extends CompactionPhase {
+        private final EntryLogMetadata metadata;
+
+        ScanEntryLogPhase(EntryLogMetadata metadata) {
+            super("ScanEntryLogPhase");
+            this.metadata = metadata;
+        }
+
+        @Override
+        void start() throws IOException {
+            // scan entry log into compaction log and offset list
+            entryLogger.createNewCompactionLog();
+            entryLogger.scanEntryLog(metadata.getEntryLogId(), new 
EntryLogScanner() {
+                @Override
+                public boolean accept(long ledgerId) {
+                    return metadata.containsLedger(ledgerId);
+                }
+
+                @Override
+                public void process(long ledgerId, long offset, ByteBuffer 
entry) throws IOException {
+                    throttler.acquire(entry.remaining());
+                    synchronized (TransactionalEntryLogCompactor.this) {
+                        long lid = entry.getLong();
+                        long entryId = entry.getLong();
+                        if (lid != ledgerId || entryId < -1) {
+                            LOG.warn("Scanning expected ledgerId {}, but found 
invalid entry " +
+                                    "with ledgerId {} entryId {} at offset {}",
+                                new Object[]{ledgerId, lid, entryId, offset});
+                            throw new IOException("Invalid entry found @ 
offset " + offset);
+                        }
+                        entry.rewind();
+                        long newOffset = 
entryLogger.addEntryForCompaction(ledgerId, entry);
+                        offsets.add(new EntryLocation(ledgerId, entryId, 
newOffset));
+
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Compact add entry : lid = {}, eid = {}, 
offset = {}",
+                                new Object[]{ledgerId, entryId, newOffset});
+                        }
+                    }
+                }
+            });
+        }
+
+        @Override
+        boolean complete() {
+            if (offsets.isEmpty()) {
+                // no valid entries is compacted, delete entry log file
+                LOG.info("No valid entry is found in entry log after scan, 
removing entry log now.");
+                gcThread.removeEntryLog(metadata.getEntryLogId());
+                entryLogger.removeCurCompactionLog();
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        void abort() {
+            offsets.clear();
+            // since we haven't flushed yet, we only need to delete the 
unflushed compaction file.
+            entryLogger.removeCurCompactionLog();
+        }
+
+    }
+
+    /**
+     * Assume we're compacting log 1 to log 3.
+     * This phase is to flush the compaction log.
+     * When this phase starts, there should be a compaction log file like 
"3.log.compacting"
+     * When compaction log is flushed, in order to indicate this phase is 
completed,
+     * a hardlink file "3.log.1.compacted" should be created, and 
"3.log.compacting" should be deleted.
+     */
+    class FlushCompactionLogPhase extends CompactionPhase {
+        private final long compactingLogId;
+        private File compactedLogFile;
+
+        FlushCompactionLogPhase(long compactingLogId) {
+            super("FlushCompactionLogPhase");
+            this.compactingLogId = compactingLogId;
+        }
+
+        @Override
+        void start() throws IOException {
+            // flush the current compaction log.
+            File compactionLogFile = entryLogger.getCurCompactionLogFile();
+            if (compactionLogFile == null || !compactionLogFile.exists()) {
+                throw new IOException("Compaction log doesn't exist during 
flushing");
+            }
+            entryLogger.flushCompactionLog();
+        }
+
+        @Override
+        boolean complete() throws IOException {
+            // create a hard link file named "x.log.y.compacted" for file 
"x.log.compacting".
+            // where x is compactionLogId and y is compactingLogId.
+            File compactionLogFile = entryLogger.getCurCompactionLogFile();
+            if (compactionLogFile == null || !compactionLogFile.exists()) {
+                LOG.warn("Compaction log doesn't exist any more after flush");
+                return false;
+            }
+            compactedLogFile = getCompactedLogFile(compactionLogFile, 
compactingLogId);
+            if (compactedLogFile != null && !compactedLogFile.exists()) {
+                HardLink.createHardLink(compactionLogFile, compactedLogFile);
+            }
+            entryLogger.removeCurCompactionLog();
+            return true;
+        }
+
+        @Override
+        void abort() {
+            offsets.clear();
+            // remove compaction log file and its hardlink
+            entryLogger.removeCurCompactionLog();
+            if (compactedLogFile != null && compactedLogFile.exists()) {
+                if (!compactedLogFile.delete()) {
+                    LOG.warn("Could not delete compacted log file {}", 
compactedLogFile);
+                }
+            }
+        }
+    }
+
+    /**
+     * Assume we're compacting log 1 to log 3.
+     * This phase is to update the entry locations and flush the index.
+     * When the phase start, there should be a compacted file like 
"3.log.1.compacted",
+     * where 3 is the new compaction logId and 1 is the old entry logId.
+     * After the index the flushed successfully, a hardlink "3.log" file 
should be created,
+     * and 3.log.1.compacted file should be deleted to indicate the phase is 
succeed.
+     * <p>
+     * This phase can also used to recover partially flushed index when we 
pass isInRecovery=true
+     */
+    class UpdateIndexPhase extends CompactionPhase {
+        File compactedLogFile;
+        File newEntryLogFile;
+        private final boolean isInRecovery;
+
+        public UpdateIndexPhase(File compactedLogFile) {
+            this(compactedLogFile, false);
+        }
+
+        public UpdateIndexPhase(File compactedLogFile, boolean isInRecovery) {
+            super("UpdateIndexPhase");
+            this.compactedLogFile = compactedLogFile;
+            this.isInRecovery = isInRecovery;
+        }
+
+        @Override
+        void start() throws IOException {
+            if (compactedLogFile != null && compactedLogFile.exists()) {
+                File dir = compactedLogFile.getParentFile();
+                String compactedFilename = compactedLogFile.getName();
+                // create a hard link "x.log" for file "x.log.y.compacted"
+                this.newEntryLogFile = new File(dir, 
compactedFilename.substring(0, compactedFilename.indexOf(".log") + 4));
+                if (!newEntryLogFile.exists()) {
+                    HardLink.createHardLink(compactedLogFile, newEntryLogFile);
+                }
+                if (isInRecovery) {
+                    
recoverEntryLocations(EntryLogger.fileName2LogId(newEntryLogFile.getName()));
+                }
+                if (!offsets.isEmpty()) {
+                    // update entry locations and flush index
+                    ledgerStorage.updateEntriesLocations(offsets);
+                    ledgerStorage.flushEntriesLocationsIndex();
+                }
+            } else {
+                throw new IOException("Failed to find compacted log file in 
UpdateIndexPhase");
+            }
+        }
+
+        @Override
+        boolean complete() {
+            // When index is flushed, and entry log is removed,
+            // delete the ".compacted" file to indicate this phase is 
completed.
+            offsets.clear();
+            if (compactedLogFile != null) {
+                if (!compactedLogFile.delete()) {
+                    LOG.warn("Could not delete compacted log file {}", 
compactedLogFile);
+                }
+                // Now delete the old entry log file since it's compacted
+                String compactedFilename = compactedLogFile.getName();
+                String oldEntryLogFilename = 
compactedFilename.substring(compactedFilename.indexOf(".log") + 5);
+                long entryLogId = 
EntryLogger.fileName2LogId(oldEntryLogFilename);
+                gcThread.removeEntryLog(entryLogId);
+            }
+            return true;
+        }
+
+        @Override
+        void abort() {
+            offsets.clear();
+        }
+
+        /**
+         * Scan entry log to recover entry locations.
+         */
+        private void recoverEntryLocations(long compactedLogId) throws 
IOException {
+            entryLogger.scanEntryLog(compactedLogId, new EntryLogScanner() {
+                @Override
+                public boolean accept(long ledgerId) {
+                    return true;
+                }
+
+                @Override
+                public void process(long ledgerId, long offset, ByteBuffer 
entry) throws IOException {
+                    long lid = entry.getLong();
+                    long entryId = entry.getLong();
+                    if (lid != ledgerId || entryId < -1) {
+                        LOG.warn("Scanning expected ledgerId {}, but found 
invalid entry " +
+                                "with ledgerId {} entryId {} at offset {}",
+                            new Object[]{ledgerId, lid, entryId, offset});
+                        throw new IOException("Invalid entry found @ offset " 
+ offset);
+                    }
+                    entry.rewind();
+                    long location = (compactedLogId << 32L) | (offset + 4);
+                    offsets.add(new EntryLocation(lid, entryId, location));
+                }
+            });
+            LOG.info("Recovered {} entry locations from compacted log {}", 
offsets.size(), compactedLogId);
+        }
+    }
+
+    File getCompactedLogFile(File compactionLogFile, long compactingLogId) {
+        if (compactionLogFile == null) {
+            return null;
+        }
+        File dir = compactionLogFile.getParentFile();
+        String filename = compactionLogFile.getName();
+        String newSuffix = ".log." + 
EntryLogger.logId2HexString(compactingLogId) + COMPACTED_SUFFIX;
+        String hardLinkFilename = filename.replace(COMPACTING_SUFFIX, 
newSuffix);
+        return new File(dir, hardLinkFilename);
+    }
+
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index fea051c..8dcb1b3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -56,6 +56,7 @@ public class ServerConfiguration extends 
AbstractConfiguration {
     protected final static String GC_WAIT_TIME = "gcWaitTime";
     protected final static String IS_FORCE_GC_ALLOW_WHEN_NO_SPACE = 
"isForceGCAllowWhenNoSpace";
     protected final static String GC_OVERREPLICATED_LEDGER_WAIT_TIME = 
"gcOverreplicatedLedgerWaitTime";
+    protected final static String USE_TRANSACTIONAL_COMPACTION = 
"useTransactionalCompaction";
     // Sync Parameters
     protected final static String FLUSH_INTERVAL = "flushInterval";
     protected final static String FLUSH_ENTRYLOG_INTERVAL_BYTES = 
"flushEntrylogBytes";
@@ -292,6 +293,25 @@ public class ServerConfiguration extends 
AbstractConfiguration {
     }
 
     /**
+     * Get whether to use transactional compaction and using a separate log 
for compaction or not.
+     *
+     * @return use transactional compaction
+     */
+    public boolean getUseTransactionalCompaction() {
+        return this.getBoolean(USE_TRANSACTIONAL_COMPACTION, false);
+    }
+
+    /**
+     * Set whether to use transactional compaction and using a separate log 
for compaction or not.
+     * @param useTransactionalCompaction
+     * @return server configuration
+     */
+    public ServerConfiguration setUseTransactionalCompaction(boolean 
useTransactionalCompaction) {
+        this.setProperty(USE_TRANSACTIONAL_COMPACTION, 
useTransactionalCompaction);
+        return this;
+    }
+
+    /**
      * Get flush interval. Default value is 10 second. It isn't useful to 
decrease
      * this value, since ledger storage only checkpoints when an entry logger 
file
      * is rolled.
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
index af8f1a3..cea3b52 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
@@ -32,6 +32,8 @@ import 
org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.TestUtils;
@@ -132,9 +134,21 @@ public class BookieStorageThresholdTest extends 
BookKeeperClusterTestCase {
 
     @Test
     public void testStorageThresholdCompaction() throws Exception {
-
-        // We are having BKCluster with just one bookie (NUM_BOOKIES = 1).
-        Bookie bookie = bs.get(0).getBookie();
+        stopAllBookies();
+        ServerConfiguration conf = newServerConfiguration();
+        File ledgerDir1 = createTempDir("ledger", "test1");
+        File ledgerDir2 = createTempDir("ledger","test2");
+        File journalDir = createTempDir("journal","test");
+        String[] ledgerDirNames = new String[]{
+            ledgerDir1.getPath(),
+            ledgerDir2.getPath()
+        };
+        conf.setLedgerDirNames(ledgerDirNames);
+        conf.setJournalDirName(journalDir.getPath());
+        BookieServer server = startBookie(conf);
+        bs.add(server);
+        bsConfs.add(conf);
+        Bookie bookie = server.getBookie();
         // since we are going to set dependency injected ledgermonitor, so we 
need to shutdown
         // the ledgermonitor which was created as part of the initialization 
of Bookie
         bookie.ledgerMonitor.shutdown();
@@ -197,14 +211,9 @@ public class BookieStorageThresholdTest extends 
BookKeeperClusterTestCase {
         bkc.deleteLedger(lhs[1].getId());
         bkc.deleteLedger(lhs[2].getId());
 
-        // since compaction intervals are too long, there is no possibility 
for compaction to get kicked in
-        // so all the entrylogs (0,1,2) should be available in the 
ledgerdirectory
-        assertTrue("All the entry log files ([0,1,2].log are not available, 
which is not expected"
-                + tmpDirs.get(0).getAbsolutePath(), 
TestUtils.hasLogFiles(tmpDirs.get(0), false, 0, 1, 2));
         // validating that LedgerDirsListener are not triggered yet
         assertTrue("Disk Full shouldn't have been triggered yet", 
diskFull.getCount() == 1);
         assertTrue("Disk writable shouldn't have been triggered yet", 
diskWritable.getCount() == 1);
-
         // set exception injection to true, so that next time when checkDir of 
DiskChecker (ThresholdTestDiskChecker) is
         // called it will throw DiskOutOfSpaceException
         thresholdTestDiskChecker.setInjectDiskOutOfSpaceException(true);
@@ -226,9 +235,10 @@ public class BookieStorageThresholdTest extends 
BookKeeperClusterTestCase {
         // force GC.
         // Because of getWritableLedgerDirsForNewLog, compaction would be able 
to create newlog and compact even though
         // there are no writableLedgerDirs
-        assertFalse(
-                "Found entry log file ([0,1,2].log. They should have been 
compacted" + tmpDirs.get(0).getAbsolutePath(),
-                TestUtils.hasLogFiles(tmpDirs.get(0), true, 0, 1, 2));
+        for(File ledgerDir : bookie.getLedgerDirsManager().getAllLedgerDirs()) 
{
+            assertFalse("Found entry log file ([0,1,2].log. They should have 
been compacted" + ledgerDir,
+                TestUtils.hasLogFiles(ledgerDir.getParentFile(), true, 0, 1, 
2));
+        }
 
         try {
             ledgerDirsManager.getWritableLedgerDirs();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 57b9404..17cc04f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -25,6 +25,8 @@ import io.netty.buffer.Unpooled;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -34,8 +36,6 @@ import java.util.Enumeration;
 import java.util.List;
 
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
-import 
org.apache.bookkeeper.bookie.GarbageCollectorThread.CompactionScannerFactory;
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -51,17 +51,18 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataLis
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.bookkeeper.util.HardLink;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.TestUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.AsyncCallback;
 
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTED_SUFFIX;
 import static org.junit.Assert.*;
 /**
  * This class tests the entry log compaction functionality.
@@ -356,6 +357,31 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
 
     @Test
     public void 
testMinorCompactionWithNoWritableLedgerDirsButIsForceGCAllowWhenNoSpaceIsSet() 
throws Exception {
+        stopAllBookies();
+        ServerConfiguration conf = newServerConfiguration();
+        // disable major compaction
+        conf.setMajorCompactionThreshold(0.0f);
+        // here we are setting isForceGCAllowWhenNoSpace to true, so Major and 
Minor compaction wont be disabled in case
+        // when discs are full
+        conf.setIsForceGCAllowWhenNoSpace(true);
+        conf.setGcWaitTime(600000);
+        conf.setMinorCompactionInterval(120000);
+        conf.setMajorCompactionInterval(240000);
+        // We need at least 2 ledger dirs because compaction will flush ledger 
cache, and will
+        // trigger relocateIndexFileAndFlushHeader. If we only have one ledger 
dir, compaction will always fail
+        // when there's no writeable ledger dir.
+        File ledgerDir1 = createTempDir("ledger", "test1");
+        File ledgerDir2 = createTempDir("ledger","test2");
+        File journalDir = createTempDir("journal","test");
+        String[] ledgerDirNames = new String[]{
+            ledgerDir1.getPath(),
+            ledgerDir2.getPath()
+        };
+        conf.setLedgerDirNames(ledgerDirNames);
+        conf.setJournalDirName(journalDir.getPath());
+        BookieServer server = startBookie(conf);
+        bs.add(server);
+        bsConfs.add(conf);
         // prepare data
         LedgerHandle[] lhs = prepareData(3, false);
 
@@ -363,19 +389,6 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
             lh.close();
         }
 
-        // disable major compaction
-        baseConf.setMajorCompactionThreshold(0.0f);
-
-        // here we are setting isForceGCAllowWhenNoSpace to true, so Major and 
Minor compaction wont be disabled in case
-        // when discs are full
-        baseConf.setIsForceGCAllowWhenNoSpace(true);
-        baseConf.setGcWaitTime(60000);
-        baseConf.setMinorCompactionInterval(120000);
-        baseConf.setMajorCompactionInterval(240000);
-
-        // restart bookies
-        restartBookies(baseConf);
-
         long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
         long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
         assertFalse(getGCThread().enableMajorCompaction);
@@ -383,6 +396,8 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
 
         for (BookieServer bookieServer : bs) {
             Bookie bookie = bookieServer.getBookie();
+            bookie.ledgerStorage.flush();
+            bookie.ledgerMonitor.shutdown();
             LedgerDirsManager ledgerDirsManager = 
bookie.getLedgerDirsManager();
             List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs();
             // Major and Minor compaction are not disabled even though discs 
are full. Check LedgerDirsListener of
@@ -407,9 +422,9 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
         // allocating newlog
         // we get getWritableLedgerDirsForNewLog() of ledgerDirsManager 
instead of getWritableLedgerDirs()
         // entry logs ([0,1,2].log) should be compacted.
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : 
server.getBookie().getLedgerDirsManager().getAllLedgerDirs()) {
             assertFalse("Found entry log file ([0,1,2].log that should have 
not been compacted in ledgerDirectory: "
-                    + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, 
true, 0, 1, 2));
+                    + ledgerDirectory, 
TestUtils.hasLogFiles(ledgerDirectory.getParentFile(), true, 0, 1, 2));
         }
 
         // even entry log files are removed, we still can access entries for 
ledger1
@@ -817,13 +832,7 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
     }
 
     @Test
-    public void testCompactionWithEntryLogRollover() throws Exception {
-        // Disable bookie gc during this test
-        baseConf.setGcWaitTime(60000);
-        baseConf.setMinorCompactionInterval(0);
-        baseConf.setMajorCompactionInterval(0);
-        restartBookies(baseConf);
-
+    public void testRecoverIndexWhenIndexIsPartiallyFlush() throws Exception {
         // prepare data
         LedgerHandle[] lhs = prepareData(3, false);
 
@@ -831,46 +840,249 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
             lh.close();
         }
 
+        // disable compaction
+        baseConf.setMinorCompactionThreshold(0.0f);
+        baseConf.setMajorCompactionThreshold(0.0f);
+        baseConf.setGcWaitTime(600000);
+
+        // restart bookies
+        restartBookies(baseConf);
+
+        Bookie bookie = bs.get(0).getBookie();
+        InterleavedLedgerStorage storage = (InterleavedLedgerStorage) 
bookie.ledgerStorage;
+
         // remove ledger2 and ledger3
         bkc.deleteLedger(lhs[1].getId());
         bkc.deleteLedger(lhs[2].getId());
+
         LOG.info("Finished deleting the ledgers contains most entries.");
 
-        InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) 
bs.get(0).getBookie().ledgerStorage;
-        GarbageCollectorThread garbageCollectorThread = ledgerStorage.gcThread;
-        CompactionScannerFactory compactionScannerFactory = 
garbageCollectorThread.scannerFactory;
-        long entryLogId = 0;
-        EntryLogger entryLogger = ledgerStorage.entryLogger;
+        MockTransactionalEntryLogCompactor partialCompactionWorker = new 
MockTransactionalEntryLogCompactor(
+            ((InterleavedLedgerStorage) bookie.ledgerStorage).gcThread);
+
+        for (long logId = 0; logId < 3; logId++) {
+            EntryLogMetadata meta = 
storage.entryLogger.getEntryLogMetadata(logId);
+            partialCompactionWorker.compactWithIndexFlushFailure(meta);
+        }
+
+        // entry logs ([0,1,2].log) should not be compacted because of partial 
flush throw IOException
+        for (File ledgerDirectory : tmpDirs) {
+            assertTrue("Entry log file ([0,1,2].log should not be compacted in 
ledgerDirectory: "
+                + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, 
true, 0, 1, 2));
+        }
 
-        LOG.info("Before compaction -- Least unflushed log id: {}", 
entryLogger.getLeastUnflushedLogId());
+        // entries should be available
+        verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed());
 
-        // Compact entryLog 0
-        EntryLogScanner scanner = 
compactionScannerFactory.newScanner(entryLogger.getEntryLogMetadata(entryLogId));
+        // But we should see .compacted file with index flush failure
+        assertEquals(findCompactedEntryLogFiles().size(), 3);
 
-        entryLogger.scanEntryLog(entryLogId, scanner);
+        // Now try to recover those flush failed index files
+        partialCompactionWorker.cleanUpAndRecover();
 
-        long entryLogIdAfterCompaction = entryLogger.getLeastUnflushedLogId();
-        LOG.info("After compaction -- Least unflushed log id: {}", 
entryLogIdAfterCompaction);
+        // There should be no .compacted files after recovery
+        assertEquals(findCompactedEntryLogFiles().size(), 0);
+
+        // compaction worker should recover partial flushed index and delete 
[0,1,2].log
+        for (File ledgerDirectory : tmpDirs) {
+            assertFalse("Entry log file ([0,1,2].log should have been 
compacted in ledgerDirectory: "
+                + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, 
true, 0, 1, 2));
+        }
+
+        // even entry log files are removed, we still can access entries for 
ledger1
+        // since those entries has been compacted to new entry log
+        verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed());
+    }
 
-        // Add more entries to trigger entrylog roll over
-        LedgerHandle[] lhs2 = prepareData(3, false);
+    @Test
+    public void testCompactionFailureShouldNotResultInDuplicatedData() throws 
Exception {
+        // prepare data
+        LedgerHandle[] lhs = prepareData(5, false);
 
-        for (LedgerHandle lh : lhs2) {
+        for (LedgerHandle lh : lhs) {
             lh.close();
         }
 
-        // Wait for entry logger to move forward
-        while (entryLogger.getLeastUnflushedLogId() <= 
entryLogIdAfterCompaction) {
-            Thread.sleep(100);
+        // disable compaction
+        baseConf.setMinorCompactionThreshold(0.0f);
+        baseConf.setMajorCompactionThreshold(0.0f);
+        baseConf.setUseTransactionalCompaction(true);
+
+        // restart bookies
+        restartBookies(baseConf);
+
+        // remove ledger2 and ledger3
+        bkc.deleteLedger(lhs[1].getId());
+        bkc.deleteLedger(lhs[2].getId());
+
+        LOG.info("Finished deleting the ledgers contains most entries.");
+        Thread.sleep(baseConf.getMajorCompactionInterval() * 1000
+            + baseConf.getGcWaitTime());
+        Bookie bookie = bs.get(0).getBookie();
+        InterleavedLedgerStorage storage = (InterleavedLedgerStorage) 
bookie.ledgerStorage;
+
+        List<File> ledgerDirs = 
bookie.getLedgerDirsManager().getAllLedgerDirs();
+        List<Long> usageBeforeCompaction = new ArrayList<>();
+        ledgerDirs.forEach(file -> 
usageBeforeCompaction.add(getDirectorySpaceUsage(file)));
+
+        MockTransactionalEntryLogCompactor partialCompactionWorker = new 
MockTransactionalEntryLogCompactor(
+            ((InterleavedLedgerStorage) bookie.ledgerStorage).gcThread);
+
+        for (long logId = 0; logId < 5; logId++) {
+            EntryLogMetadata meta = 
storage.entryLogger.getEntryLogMetadata(logId);
+            partialCompactionWorker.compactWithLogFlushFailure(meta);
+        }
+
+        // entry logs ([0-4].log) should not be compacted because of failure 
in flush compaction log
+        for (File ledgerDirectory : tmpDirs) {
+            assertTrue("Entry log file ([0,1,2].log should not be compacted in 
ledgerDirectory: "
+                + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, 
true, 0, 1, 2, 3, 4));
+        }
+        // even entry log files are removed, we still can access entries for 
ledger1
+        // since those entries has been compacted to new entry log
+        verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed());
+
+        List<Long> freeSpaceAfterCompactionFailed = new ArrayList<>();
+        ledgerDirs.forEach(file -> 
freeSpaceAfterCompactionFailed.add(getDirectorySpaceUsage(file)));
+
+        // No extra data is generated after compaction fail
+        for (int i = 0; i < usageBeforeCompaction.size(); i++) {
+            assertEquals(usageBeforeCompaction.get(i), 
freeSpaceAfterCompactionFailed.get(i));
         }
 
-        long entryLogIdBeforeFlushing = entryLogger.getLeastUnflushedLogId();
-        LOG.info("Added more data -- Least unflushed log id: {}", 
entryLogIdBeforeFlushing);
+        // now enable normal compaction
+        baseConf.setMajorCompactionThreshold(0.5f);
 
-        Assert.assertTrue(entryLogIdAfterCompaction < 
entryLogIdBeforeFlushing);
+        // restart bookies
+        restartBookies(baseConf);
 
-        // Wait for entries to be flushed on entry logs and update index
-        // This operation should succeed even if the entry log rolls over 
after the last entry was compacted
-        compactionScannerFactory.flush();
+        Thread.sleep(baseConf.getMajorCompactionInterval() * 1000
+            + baseConf.getGcWaitTime());
+        // compaction worker should compact [0-4].log
+        for (File ledgerDirectory : tmpDirs) {
+            assertFalse("Entry log file ([0,1,2].log should have been 
compacted in ledgerDirectory: "
+                + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, 
true, 0, 1, 2, 3, 4));
+        }
+
+        // even entry log files are removed, we still can access entries for 
ledger1
+        // since those entries has been compacted to new entry log
+        verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed());
+    }
+
+    private long getDirectorySpaceUsage(File dir) {
+        long size = 0;
+        for (File file : dir.listFiles()) {
+            size += file.length();
+        }
+        return size;
     }
+
+    private Set<File> findCompactedEntryLogFiles() {
+        Set<File> compactedLogFiles = new HashSet<>();
+        for (File ledgerDirectory : tmpDirs) {
+            File[] files = 
Bookie.getCurrentDirectory(ledgerDirectory).listFiles(
+                file -> file.getName().endsWith(COMPACTED_SUFFIX));
+            if (files != null) {
+                for (File file : files) {
+                    compactedLogFiles.add(file);
+                }
+            }
+        }
+        return compactedLogFiles;
+    }
+
+    private static class MockTransactionalEntryLogCompactor extends 
TransactionalEntryLogCompactor {
+
+        public MockTransactionalEntryLogCompactor(GarbageCollectorThread 
gcThread) {
+            super(gcThread);
+        }
+
+        synchronized void compactWithIndexFlushFailure(EntryLogMetadata 
metadata) {
+            LOG.info("Compacting entry log {}.", metadata.getEntryLogId());
+            CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata);
+            if (!scanEntryLog.run()) {
+                LOG.info("Compaction for {} end in ScanEntryLogPhase.", 
metadata.getEntryLogId());
+                return;
+            }
+            File compactionLogFile = entryLogger.getCurCompactionLogFile();
+            CompactionPhase flushCompactionLog = new 
FlushCompactionLogPhase(metadata.getEntryLogId());
+            if (!flushCompactionLog.run()) {
+                LOG.info("Compaction for {} end in FlushCompactionLogPhase.", 
metadata.getEntryLogId());
+                return;
+            }
+            File compactedLogFile = getCompactedLogFile(compactionLogFile, 
metadata.getEntryLogId());
+            CompactionPhase partialFlushIndexPhase = new 
PartialFlushIndexPhase(compactedLogFile);
+            if (!partialFlushIndexPhase.run()) {
+                LOG.info("Compaction for {} end in PartialFlushIndexPhase.", 
metadata.getEntryLogId());
+                return;
+            }
+            gcThread.removeEntryLog(metadata.getEntryLogId());
+            LOG.info("Compacted entry log : {}.", metadata.getEntryLogId());
+        }
+
+        synchronized void compactWithLogFlushFailure(EntryLogMetadata 
metadata) {
+            LOG.info("Compacting entry log {}", metadata.getEntryLogId());
+            CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata);
+            if (!scanEntryLog.run()) {
+                LOG.info("Compaction for {} end in ScanEntryLogPhase.", 
metadata.getEntryLogId());
+                return;
+            }
+            File compactionLogFile = entryLogger.getCurCompactionLogFile();
+            CompactionPhase logFlushFailurePhase = new 
LogFlushFailurePhase(metadata.getEntryLogId());
+            if (!logFlushFailurePhase.run()) {
+                LOG.info("Compaction for {} end in FlushCompactionLogPhase.", 
metadata.getEntryLogId());
+                return;
+            }
+            File compactedLogFile = getCompactedLogFile(compactionLogFile, 
metadata.getEntryLogId());
+            CompactionPhase updateIndex = new 
UpdateIndexPhase(compactedLogFile);
+            if (!updateIndex.run()) {
+                LOG.info("Compaction for entry log {} end in 
UpdateIndexPhase.", metadata.getEntryLogId());
+                return;
+            }
+            gcThread.removeEntryLog(metadata.getEntryLogId());
+            LOG.info("Compacted entry log : {}.", metadata.getEntryLogId());
+        }
+
+        private class PartialFlushIndexPhase extends UpdateIndexPhase {
+
+            public PartialFlushIndexPhase(File compactedLogFile) {
+                super(compactedLogFile);
+            }
+
+            @Override
+            void start() throws IOException {
+                if (compactedLogFile != null && compactedLogFile.exists()) {
+                    File dir = compactedLogFile.getParentFile();
+                    String compactedFilename = compactedLogFile.getName();
+                    // create a hard link "x.log" for file "x.log.y.compacted"
+                    this.newEntryLogFile = new File(dir, 
compactedFilename.substring(0, compactedFilename.indexOf(".log") + 4));
+                    File hardlinkFile = new File(dir, 
newEntryLogFile.getName());
+                    if (!hardlinkFile.exists()) {
+                        HardLink.createHardLink(compactedLogFile, 
hardlinkFile);
+                    }
+                    assertTrue(offsets.size() > 1);
+                    // only flush index for one entry location
+                    EntryLocation el = offsets.get(0);
+                    ledgerStorage.updateEntriesLocations(offsets);
+                    ledgerStorage.flushEntriesLocationsIndex();
+                    throw new IOException("Flush ledger index encounter 
exception");
+                }
+            }
+        }
+
+        private class LogFlushFailurePhase extends FlushCompactionLogPhase {
+
+            LogFlushFailurePhase(long compactingLogId) {
+                super(compactingLogId);
+            }
+
+            @Override
+            void start() throws IOException {
+                // flush the current compaction log
+                entryLogger.flushCompactionLog();
+                throw new IOException("Encounter IOException when trying to 
flush compaction log");
+            }
+        }
+    }
+
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index f5a30f3..b5a22f8 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -241,7 +241,12 @@ public abstract class BookKeeperClusterTestCase {
         for (BookieServer server : bs) {
             server.shutdown();
         }
+        bsConfs.clear();
         bs.clear();
+        if (bkc != null) {
+            bkc.close();
+            bkc = null;
+        }
     }
 
     protected void startAllBookies() throws Exception {
@@ -598,6 +603,10 @@ public abstract class BookKeeperClusterTestCase {
         };
         server.start();
 
+        if (bkc == null) {
+            bkc = new BookKeeperTestClient(baseClientConf);
+        }
+
         int port = conf.getBookiePort();
         String host = InetAddress.getLocalHost().getHostAddress();
         if (conf.getUseHostNameAsBookieID()) {

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].

Reply via email to