This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new 3392bee BOOKKEEPER-1040: Use separate log for compaction and add transaction support 3392bee is described below commit 3392beee5c70abe36d36604723e14d97b7764be9 Author: Yiming Zang <yz...@twitter.com> AuthorDate: Wed Nov 8 19:27:20 2017 +0800 BOOKKEEPER-1040: Use separate log for compaction and add transaction support Problem: Compaction might keep generating duplicated data which would cause disk full. This is because we don't have transactional operation for compaction. So if compaction keeps failing in the middle, bookie would end up with a lot of garbage data. Change: 1. Introduce abstract class AbstractLogCompactor with an interface compact(). 2. Move the existing compaction logic to a separate class called EntryLogCompactor. 3. Introduce transactional compaction, we can recover an incomplete compaction or rollback a compaction failure. 4. Add an configuration to enable transactional compaction Potential Risk: 1. No risk if we keep using the default compactor. 2. If we choose to enable transactional compaction with separate log file, we need to be careful about it, since we use a separate log for compaction, if we have a lot of old small ledgers in the bookie, we will end up with a lot of small entry log files. And this will potentially cause bookkeeper "Too many open files" because GC will scan all the entry log files and keep them open. Author: Yiming Zang <yz...@twitter.com> Reviewers: Enrico Olivelli <eolive...@gmail.com>, Jia Zhai <None>, Sijie Guo <si...@apache.org> This closes #704 from yzang/yzang/BOOKKEEPER-1040 --- .../bookkeeper/bookie/AbstractLogCompactor.java | 71 ++++ .../apache/bookkeeper/bookie/EntryLocation.java | 12 + .../bookkeeper/bookie/EntryLogCompactor.java | 123 +++++++ .../org/apache/bookkeeper/bookie/EntryLogger.java | 267 ++++++++++---- .../bookkeeper/bookie/GarbageCollectorThread.java | 192 ++-------- .../bookie/TransactionalEntryLogCompactor.java | 388 +++++++++++++++++++++ .../bookkeeper/conf/ServerConfiguration.java | 20 ++ .../bookie/BookieStorageThresholdTest.java | 32 +- .../apache/bookkeeper/bookie/CompactionTest.java | 308 +++++++++++++--- .../bookkeeper/test/BookKeeperClusterTestCase.java | 9 + 10 files changed, 1140 insertions(+), 282 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java new file mode 100644 index 0000000..8f190a3 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java @@ -0,0 +1,71 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import com.google.common.util.concurrent.RateLimiter; + +import org.apache.bookkeeper.conf.ServerConfiguration; + +/** + * Abstract entry log compactor used for compaction. + */ +public abstract class AbstractLogCompactor { + + protected final ServerConfiguration conf; + protected final Throttler throttler; + protected final GarbageCollectorThread gcThread; + + public AbstractLogCompactor(GarbageCollectorThread gcThread) { + this.gcThread = gcThread; + this.conf = gcThread.conf; + this.throttler = new Throttler(conf); + } + + /** + * Compact entry log file. + * @param entryLogMeta log metadata for the entry log to be compacted + * @return true for succeed + */ + public abstract boolean compact(EntryLogMetadata entryLogMeta); + + /** + * Do nothing by default. Intended for subclass to override this method. + */ + public void cleanUpAndRecover() {} + + static class Throttler { + private final RateLimiter rateLimiter; + private final boolean isThrottleByBytes; + + Throttler(ServerConfiguration conf) { + this.isThrottleByBytes = conf.getIsThrottleByBytes(); + this.rateLimiter = RateLimiter.create(this.isThrottleByBytes + ? conf.getCompactionRateByBytes() : conf.getCompactionRateByEntries()); + } + + // acquire. if bybytes: bytes of this entry; if byentries: 1. + void acquire(int permits) { + rateLimiter.acquire(this.isThrottleByBytes ? permits : 1); + } + } + +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java index 87005ce..be5eb7f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java @@ -34,4 +34,16 @@ public class EntryLocation { this.entry = entry; this.location = location; } + + public long getLedger() { + return ledger; + } + + public long getEntry() { + return entry; + } + + public long getLocation() { + return location; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java new file mode 100644 index 0000000..c31b989 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java @@ -0,0 +1,123 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is the basic entry log compactor to compact entry logs. + * The compaction is done by scanning the old entry log file, copy the active ledgers to the + * current entry logger and remove the old entry log when the scan is over. + */ +public class EntryLogCompactor extends AbstractLogCompactor { + private static final Logger LOG = LoggerFactory.getLogger(EntryLogCompactor.class); + + final CompactionScannerFactory scannerFactory = new CompactionScannerFactory(); + final EntryLogger entryLogger; + final CompactableLedgerStorage ledgerStorage; + private final int maxOutstandingRequests; + + public EntryLogCompactor(GarbageCollectorThread gcThread) { + super(gcThread); + this.maxOutstandingRequests = conf.getCompactionMaxOutstandingRequests(); + this.entryLogger = gcThread.getEntryLogger(); + this.ledgerStorage = gcThread.getLedgerStorage(); + } + + @Override + public boolean compact(EntryLogMetadata entryLogMeta) { + try { + entryLogger.scanEntryLog(entryLogMeta.getEntryLogId(), + scannerFactory.newScanner(entryLogMeta)); + scannerFactory.flush(); + LOG.info("Removing entry log {} after compaction", entryLogMeta.getEntryLogId()); + gcThread.removeEntryLog(entryLogMeta.getEntryLogId()); + } catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) { + LOG.warn("No writable ledger directory available, aborting compaction", nwlde); + return false; + } catch (IOException ioe) { + // if compact entry log throws IOException, we don't want to remove that + // entry log. however, if some entries from that log have been re-added + // to the entry log, and the offset updated, it's ok to flush that + LOG.error("Error compacting entry log. Log won't be deleted", ioe); + return false; + } + return true; + } + + /** + * A scanner wrapper to check whether a ledger is alive in an entry log file. + */ + class CompactionScannerFactory { + List<EntryLocation> offsets = new ArrayList<EntryLocation>(); + + EntryLogger.EntryLogScanner newScanner(final EntryLogMetadata meta) { + + return new EntryLogger.EntryLogScanner() { + @Override + public boolean accept(long ledgerId) { + return meta.containsLedger(ledgerId); + } + + @Override + public void process(final long ledgerId, long offset, ByteBuffer entry) throws IOException { + throttler.acquire(entry.remaining()); + + if (offsets.size() > maxOutstandingRequests) { + flush(); + } + entry.getLong(); // discard ledger id, we already have it + long entryId = entry.getLong(); + entry.rewind(); + + long newoffset = entryLogger.addEntry(ledgerId, entry); + offsets.add(new EntryLocation(ledgerId, entryId, newoffset)); + + } + }; + } + + void flush() throws IOException { + if (offsets.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping entry log flushing, as there are no offset!"); + } + return; + } + + // Before updating the index, we want to wait until all the compacted entries are flushed into the + // entryLog + try { + entryLogger.flush(); + ledgerStorage.updateEntriesLocations(offsets); + } finally { + offsets.clear(); + } + } + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java index 717a8ae..280806c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java @@ -22,6 +22,8 @@ package org.apache.bookkeeper.bookie; import static com.google.common.base.Charsets.UTF_8; + +import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX; import static org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT; import io.netty.buffer.ByteBuf; @@ -50,7 +52,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -79,17 +80,26 @@ public class EntryLogger { private static class BufferedLogChannel extends BufferedChannel { private final long logId; private final EntryLogMetadata entryLogMetadata; + private final File logFile; - public BufferedLogChannel(FileChannel fc, int writeCapacity, - int readCapacity, long logId) throws IOException { + public BufferedLogChannel(FileChannel fc, + int writeCapacity, + int readCapacity, + long logId, + File logFile) throws IOException { super(fc, writeCapacity, readCapacity); this.logId = logId; this.entryLogMetadata = new EntryLogMetadata(logId); + this.logFile = logFile; } public long getLogId() { return logId; } + public File getLogFile() { + return logFile; + } + public void registerWrittenEntry(long ledgerId, long entrySize) { entryLogMetadata.addLedgerSize(ledgerId, entrySize); } @@ -106,11 +116,17 @@ public class EntryLogger { private volatile long leastUnflushedLogId; /** + * locks for compaction log + */ + private final Object compactionLogLock = new Object(); + + /** * The maximum size of a entry logger file. */ final long logSizeLimit; private List<BufferedLogChannel> logChannelsToFlush; private volatile BufferedLogChannel logChannel; + private volatile BufferedLogChannel compactionLogChannel; private final EntryLoggerAllocator entryLoggerAllocator; private final boolean entryLogPreAllocationEnabled; private final CopyOnWriteArrayList<EntryLogListener> listeners = new CopyOnWriteArrayList<EntryLogListener>(); @@ -370,6 +386,18 @@ public class EntryLogger { return logChannel.getLogId(); } + /** + * Get the current log file for compaction + */ + File getCurCompactionLogFile() { + synchronized (compactionLogLock) { + if (compactionLogChannel == null) { + return null; + } + return compactionLogChannel.getLogFile(); + } + } + protected void initialize() throws IOException { // Register listener for disk full notifications. ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); @@ -461,6 +489,7 @@ public class EntryLogger { } else { logChannel = entryLoggerAllocator.createNewLog(); } + currentDir = logChannel.getLogFile().getParentFile(); } /** @@ -517,50 +546,59 @@ public class EntryLogger { */ class EntryLoggerAllocator { - long preallocatedLogId; - Future<BufferedLogChannel> preallocation = null; - ExecutorService allocatorExecutor; + private long preallocatedLogId; + private Future<BufferedLogChannel> preallocation = null; + private ExecutorService allocatorExecutor; + private final Object createEntryLogLock = new Object(); + private final Object createCompactionLogLock = new Object(); EntryLoggerAllocator(long logId) { preallocatedLogId = logId; allocatorExecutor = Executors.newSingleThreadExecutor(); } - synchronized BufferedLogChannel createNewLog() throws IOException { - BufferedLogChannel bc; - if (!entryLogPreAllocationEnabled || null == preallocation) { - // initialization time to create a new log - bc = allocateNewLog(); - } else { - // has a preallocated entry log - try { - bc = preallocation.get(); - } catch (ExecutionException ee) { - if (ee.getCause() instanceof IOException) { - throw (IOException) (ee.getCause()); - } else { - throw new IOException("Error to execute entry log allocation.", ee); + BufferedLogChannel createNewLog() throws IOException { + synchronized (createEntryLogLock) { + BufferedLogChannel bc; + if (!entryLogPreAllocationEnabled || null == preallocation) { + // initialization time to create a new log + bc = allocateNewLog(); + return bc; + } else { + // has a preallocated entry log + try { + bc = preallocation.get(); + } catch (ExecutionException ee) { + if (ee.getCause() instanceof IOException) { + throw (IOException) (ee.getCause()); + } else { + throw new IOException("Error to execute entry log allocation.", ee); + } + } catch (CancellationException ce) { + throw new IOException("Task to allocate a new entry log is cancelled.", ce); + } catch (InterruptedException ie) { + throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie); } - } catch (CancellationException ce) { - throw new IOException("Task to allocate a new entry log is cancelled.", ce); - } catch (InterruptedException ie) { - throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie); } - preallocation = allocatorExecutor.submit(new Callable<BufferedLogChannel>() { - @Override - public BufferedLogChannel call() throws IOException { - return allocateNewLog(); - } - }); + preallocation = allocatorExecutor.submit(() -> allocateNewLog()); + return bc; } - LOG.info("Created new entry logger {}.", bc.getLogId()); - return bc; + } + + BufferedLogChannel createNewLogForCompaction() throws IOException { + synchronized (createCompactionLogLock) { + return allocateNewLog(COMPACTING_SUFFIX); + } + } + + private BufferedLogChannel allocateNewLog() throws IOException { + return allocateNewLog(".log"); } /** * Allocate a new log file. */ - BufferedLogChannel allocateNewLog() throws IOException { + private BufferedLogChannel allocateNewLog(String suffix) throws IOException { List<File> list = ledgerDirsManager.getWritableLedgerDirsForNewLog(); Collections.shuffle(list); // It would better not to overwrite existing entry log files @@ -571,10 +609,9 @@ public class EntryLogger { } else { ++preallocatedLogId; } - String logFileName = Long.toHexString(preallocatedLogId) + ".log"; + String logFileName = Long.toHexString(preallocatedLogId) + suffix; for (File dir : list) { newLogFile = new File(dir, logFileName); - currentDir = dir; if (newLogFile.exists()) { LOG.warn("Found existed entry log " + newLogFile + " when trying to create it as a new log."); @@ -586,13 +623,13 @@ public class EntryLogger { FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel(); BufferedLogChannel logChannel = new BufferedLogChannel(channel, - conf.getWriteBufferBytes(), conf.getReadBufferBytes(), preallocatedLogId); + conf.getWriteBufferBytes(), conf.getReadBufferBytes(), preallocatedLogId, newLogFile); logChannel.write((ByteBuffer) logfileHeader.clear()); for (File f : list) { setLastLogId(f, preallocatedLogId); } - LOG.info("Preallocated entry logger {}.", preallocatedLogId); + LOG.info("Created new entry log file {} for logId {}.", newLogFile, preallocatedLogId); return logChannel; } @@ -665,12 +702,8 @@ public class EntryLogger { List<Long> logs = new ArrayList<Long>(); if (logFiles != null) { for (File lf : logFiles) { - String idString = lf.getName().split("\\.")[0]; - try { - long lid = Long.parseLong(idString, 16); - logs.add(lid); - } catch (NumberFormatException nfe) { - } + long logId = fileName2LogId(lf.getName()); + logs.add(logId); } } // no log file found in this directory @@ -776,35 +809,10 @@ public class EntryLogger { return addEntry(ledger, entry, true); } - private static final class RecyclableByteBuffer { - private static final Recycler<RecyclableByteBuffer> RECYCLER = new Recycler<RecyclableByteBuffer>() { - @Override - protected RecyclableByteBuffer newObject(Handle<RecyclableByteBuffer> handle) { - return new RecyclableByteBuffer(handle); - } - }; - - private final ByteBuffer buffer; - private final Handle<RecyclableByteBuffer> handle; - public RecyclableByteBuffer(Handle<RecyclableByteBuffer> handle) { - this.buffer = ByteBuffer.allocate(4); - this.handle = handle; - } - - public static RecyclableByteBuffer get() { - return RECYCLER.get(); - } - - public void recycle() { - buffer.rewind(); - handle.recycle(this); - } - } - synchronized long addEntry(long ledger, ByteBuffer entry, boolean rollLog) throws IOException { int entrySize = entry.remaining() + 4; boolean reachEntryLogLimit = - rollLog ? reachEntryLogLimit(entrySize) : readEntryLogHardLimit(entrySize); + rollLog ? reachEntryLogLimit(entrySize) : readEntryLogHardLimit(entrySize); // Create new log if logSizeLimit reached or current disk is full boolean createNewLog = shouldCreateNewEntryLog.get(); if (createNewLog || reachEntryLogLimit) { @@ -834,6 +842,95 @@ public class EntryLogger { return (logChannel.getLogId() << 32L) | pos; } + long addEntryForCompaction(long ledgerId, ByteBuffer entry) throws IOException { + synchronized (compactionLogLock) { + int entrySize = entry.remaining() + 4; + if (compactionLogChannel == null) { + createNewCompactionLog(); + } + ByteBuffer buff = ByteBuffer.allocate(4); + buff.putInt(entry.remaining()); + buff.flip(); + compactionLogChannel.write(buff); + long pos = compactionLogChannel.position(); + compactionLogChannel.write(entry); + compactionLogChannel.registerWrittenEntry(ledgerId, entrySize); + return (compactionLogChannel.getLogId() << 32L) | pos; + } + } + + void flushCompactionLog() throws IOException { + synchronized (compactionLogLock) { + if (compactionLogChannel != null) { + compactionLogChannel.flush(true); + LOG.info("Flushed compaction log file {} with logId.", + compactionLogChannel.getLogFile(), + compactionLogChannel.getLogId()); + // since this channel is only used for writing, after flushing the channel, + // we had to close the underlying file channel. Otherwise, we might end up + // leaking fds which cause the disk spaces could not be reclaimed. + closeFileChannel(compactionLogChannel); + } else { + throw new IOException("Failed to flush compaction log which has already been removed."); + } + } + } + + void createNewCompactionLog() throws IOException { + synchronized (compactionLogLock) { + if (compactionLogChannel == null) { + compactionLogChannel = entryLoggerAllocator.createNewLogForCompaction(); + } + } + } + + /** + * Remove the current compaction log, usually invoked when compaction failed and + * we need to do some clean up to remove the compaction log file. + */ + void removeCurCompactionLog() { + synchronized (compactionLogLock) { + if (compactionLogChannel != null) { + if (!compactionLogChannel.getLogFile().delete()) { + LOG.warn("Could not delete compaction log file {}", compactionLogChannel.getLogFile()); + } + try { + closeFileChannel(compactionLogChannel); + } catch (IOException e) { + LOG.error("Failed to close file channel for compaction log {}", compactionLogChannel.getLogId()); + } + compactionLogChannel = null; + } + } + } + + + private static final class RecyclableByteBuffer { + private static final Recycler<RecyclableByteBuffer> RECYCLER = new Recycler<RecyclableByteBuffer>() { + @Override + protected RecyclableByteBuffer newObject(Handle<RecyclableByteBuffer> handle) { + return new RecyclableByteBuffer(handle); + } + }; + + private final ByteBuffer buffer; + private final Handle<RecyclableByteBuffer> handle; + public RecyclableByteBuffer(Handle<RecyclableByteBuffer> handle) { + this.buffer = ByteBuffer.allocate(4); + this.handle = handle; + } + + public static RecyclableByteBuffer get() { + return RECYCLER.get(); + } + + public void recycle() { + buffer.rewind(); + handle.recycle(this); + } + } + + private void incrementBytesWrittenAndMaybeFlush(long bytesWritten) throws IOException { if (!doRegularFlushes) { return; @@ -1185,6 +1282,10 @@ public class EntryLogger { logid2FileChannel.clear(); // close current writing log file closeFileChannel(logChannel); + synchronized (compactionLogLock) { + closeFileChannel(compactionLogChannel); + compactionLogChannel = null; + } } catch (IOException ie) { // we have no idea how to avoid io exception during shutting down, so just ignore it LOG.error("Error flush entry log during shutting down, which may cause entry log corrupted.", ie); @@ -1193,6 +1294,9 @@ public class EntryLogger { IOUtils.close(LOG, fc); } forceCloseFileChannel(logChannel); + synchronized (compactionLogLock) { + forceCloseFileChannel(compactionLogChannel); + } } // shutdown the pre-allocation thread entryLoggerAllocator.stop(); @@ -1218,4 +1322,29 @@ public class EntryLogger { } } + protected LedgerDirsManager getLedgerDirsManager() { + return ledgerDirsManager; + } + + /** + * Convert log filename (hex format with suffix) to logId in long. + */ + static long fileName2LogId(String fileName) { + if (fileName != null && fileName.contains(".")) { + fileName = fileName.split("\\.")[0]; + } + try { + return Long.parseLong(fileName, 16); + } catch (Exception nfe) { + LOG.error("Invalid log file name {} found when trying to convert to logId.", fileName, nfe); + } + return INVALID_LID; + } + + /** + * Convert log Id to hex string + */ + static String logId2HexString(long logId) { + return Long.toHexString(logId); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 5dcd4a0..849d5bb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -22,10 +22,8 @@ package org.apache.bookkeeper.bookie; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -37,7 +35,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; + import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; @@ -76,15 +74,9 @@ public class GarbageCollectorThread extends SafeRunnable { final boolean isForceGCAllowWhenNoSpace; - final boolean isThrottleByBytes; - final int maxOutstandingRequests; - final int compactionRateByEntries; - final int compactionRateByBytes; - final CompactionScannerFactory scannerFactory; - // Entry Logger Handle final EntryLogger entryLogger; - + final AbstractLogCompactor compactor; final CompactableLedgerStorage ledgerStorage; // flag to ensure gc thread will not be interrupted during compaction @@ -106,82 +98,7 @@ public class GarbageCollectorThread extends SafeRunnable { final GarbageCollector garbageCollector; final GarbageCleaner garbageCleaner; - private static class Throttler { - final RateLimiter rateLimiter; - final boolean isThrottleByBytes; - final int compactionRateByBytes; - final int compactionRateByEntries; - - Throttler(boolean isThrottleByBytes, - int compactionRateByBytes, - int compactionRateByEntries) { - this.isThrottleByBytes = isThrottleByBytes; - this.compactionRateByBytes = compactionRateByBytes; - this.compactionRateByEntries = compactionRateByEntries; - this.rateLimiter = RateLimiter.create(this.isThrottleByBytes - ? this.compactionRateByBytes : this.compactionRateByEntries); - } - - // acquire. if bybytes: bytes of this entry; if byentries: 1. - void acquire(int permits) { - rateLimiter.acquire(this.isThrottleByBytes ? permits : 1); - } - } - - /** - * A scanner wrapper to check whether a ledger is alive in an entry log file. - */ - class CompactionScannerFactory { - List<EntryLocation> offsets = new ArrayList<EntryLocation>(); - - EntryLogScanner newScanner(final EntryLogMetadata meta) { - final Throttler throttler = new Throttler (isThrottleByBytes, - compactionRateByBytes, - compactionRateByEntries); - - return new EntryLogScanner() { - @Override - public boolean accept(long ledgerId) { - return meta.containsLedger(ledgerId); - } - - @Override - public void process(final long ledgerId, long offset, ByteBuffer entry) throws IOException { - throttler.acquire(entry.remaining()); - - if (offsets.size() > maxOutstandingRequests) { - flush(); - } - entry.getLong(); // discard ledger id, we already have it - long entryId = entry.getLong(); - entry.rewind(); - - long newoffset = entryLogger.addEntry(ledgerId, entry); - offsets.add(new EntryLocation(ledgerId, entryId, newoffset)); - - } - }; - } - - void flush() throws IOException { - if (offsets.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping entry log flushing, as there are no offset!"); - } - return; - } - - // Before updating the index, we want to wait until all the compacted entries are flushed into the - // entryLog - try { - entryLogger.flush(); - - ledgerStorage.updateEntriesLocations(offsets); - } finally { - offsets.clear(); - } - } - } + final ServerConfiguration conf; /** @@ -198,40 +115,34 @@ public class GarbageCollectorThread extends SafeRunnable { gcExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("GarbageCollectorThread-%d").build() ); - + this.conf = conf; this.entryLogger = ledgerStorage.getEntryLogger(); this.ledgerStorage = ledgerStorage; - this.gcWaitTime = conf.getGcWaitTime(); - this.isThrottleByBytes = conf.getIsThrottleByBytes(); - this.maxOutstandingRequests = conf.getCompactionMaxOutstandingRequests(); - this.compactionRateByEntries = conf.getCompactionRateByEntries(); - this.compactionRateByBytes = conf.getCompactionRateByBytes(); - this.scannerFactory = new CompactionScannerFactory(); - - this.garbageCleaner = new GarbageCollector.GarbageCleaner() { - @Override - public void clean(long ledgerId) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("delete ledger : " + ledgerId); - } - ledgerStorage.deleteLedger(ledgerId); - } catch (IOException e) { - LOG.error("Exception when deleting the ledger index file on the Bookie: ", e); + this.garbageCleaner = ledgerId -> { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("delete ledger : " + ledgerId); } + ledgerStorage.deleteLedger(ledgerId); + } catch (IOException e) { + LOG.error("Exception when deleting the ledger index file on the Bookie: ", e); } }; this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage, conf); - // compaction parameters minorCompactionThreshold = conf.getMinorCompactionThreshold(); minorCompactionInterval = conf.getMinorCompactionInterval() * SECOND; majorCompactionThreshold = conf.getMajorCompactionThreshold(); majorCompactionInterval = conf.getMajorCompactionInterval() * SECOND; isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace(); + if (conf.getUseTransactionalCompaction()) { + this.compactor = new TransactionalEntryLogCompactor(this); + } else { + this.compactor = new EntryLogCompactor(this); + } if (minorCompactionInterval > 0 && minorCompactionThreshold > 0) { if (minorCompactionThreshold > 1.0f) { @@ -360,6 +271,8 @@ public class GarbageCollectorThread extends SafeRunnable { if (force) { LOG.info("Garbage collector thread forced to perform GC before expiry of wait time."); } + // Recover and clean up previous state if using transactional compaction + compactor.cleanUpAndRecover(); // Extract all of the ledger ID's that comprise all of the entry logs // (except for the current new one which is still being written to). @@ -444,51 +357,21 @@ public class GarbageCollectorThread extends SafeRunnable { */ @VisibleForTesting void doCompactEntryLogs(double threshold) { - LOG.info("Do compaction to compact those files lower than " + threshold); - // sort the ledger meta by occupied unused space - Comparator<EntryLogMetadata> sizeComparator = new Comparator<EntryLogMetadata>() { - @Override - public int compare(EntryLogMetadata m1, EntryLogMetadata m2) { - long unusedSize1 = m1.getTotalSize() - m1.getRemainingSize(); - long unusedSize2 = m2.getTotalSize() - m2.getRemainingSize(); - if (unusedSize1 > unusedSize2) { - return -1; - } else if (unusedSize1 < unusedSize2) { - return 1; - } else { - return 0; - } - } - }; + LOG.info("Do compaction to compact those files lower than {}", threshold); + + // sort the ledger meta by usage in ascending order. List<EntryLogMetadata> logsToCompact = new ArrayList<EntryLogMetadata>(); logsToCompact.addAll(entryLogMetaMap.values()); - Collections.sort(logsToCompact, sizeComparator); + logsToCompact.sort(Comparator.comparing(EntryLogMetadata::getUsage)); for (EntryLogMetadata meta : logsToCompact) { if (meta.getUsage() >= threshold) { break; } - if (LOG.isDebugEnabled()) { LOG.debug("Compacting entry log {} below threshold {}", meta.getEntryLogId(), threshold); } - try { - compactEntryLog(scannerFactory, meta); - scannerFactory.flush(); - - LOG.info("Removing entry log {} after compaction", meta.getEntryLogId()); - removeEntryLog(meta.getEntryLogId()); - - } catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) { - LOG.warn("No writable ledger directory available, aborting compaction", nwlde); - break; - } catch (IOException ioe) { - // if compact entry log throws IOException, we don't want to remove that - // entry log. however, if some entries from that log have been readded - // to the entry log, and the offset updated, it's ok to flush that - LOG.error("Error compacting entry log. Log won't be deleted", ioe); - } - + compactEntryLog(meta); if (!running) { // if gc thread is not running, stop compaction return; } @@ -519,9 +402,10 @@ public class GarbageCollectorThread extends SafeRunnable { * @param entryLogId * Entry Log File Id */ - private void removeEntryLog(long entryLogId) { + protected void removeEntryLog(long entryLogId) { // remove entry log file successfully if (entryLogger.removeEntryLog(entryLogId)) { + LOG.info("Removing entry log metadata for {}", entryLogId); entryLogMetaMap.remove(entryLogId); } } @@ -529,11 +413,9 @@ public class GarbageCollectorThread extends SafeRunnable { /** * Compact an entry log. * - * @param scannerFactory * @param entryLogMeta */ - protected void compactEntryLog(CompactionScannerFactory scannerFactory, - EntryLogMetadata entryLogMeta) throws IOException { + protected void compactEntryLog(EntryLogMetadata entryLogMeta) { // Similar with Sync Thread // try to mark compacting flag to make sure it would not be interrupted // by shutdown during compaction. otherwise it will receive @@ -544,16 +426,10 @@ public class GarbageCollectorThread extends SafeRunnable { // indicates that compaction is in progress for this EntryLogId. return; } - - LOG.info("Compacting entry log : {} - Usage: {} %", entryLogMeta.getEntryLogId(), entryLogMeta.getUsage()); - - try { - entryLogger.scanEntryLog(entryLogMeta.getEntryLogId(), - scannerFactory.newScanner(entryLogMeta)); - } finally { - // clear compacting flag - compacting.set(false); - } + // Do the actual compaction + compactor.compact(entryLogMeta); + // Mark compaction done + compacting.set(false); } /** @@ -603,4 +479,12 @@ public class GarbageCollectorThread extends SafeRunnable { } return entryLogMetaMap; } + + EntryLogger getEntryLogger() { + return entryLogger; + } + + CompactableLedgerStorage getLedgerStorage() { + return ledgerStorage; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java new file mode 100644 index 0000000..b263453 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java @@ -0,0 +1,388 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import com.google.common.util.concurrent.RateLimiter; + +import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; +import org.apache.bookkeeper.util.HardLink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used for compaction. Compaction is done in several transactional phases. + * Phase 1: Scan old entry log and compact entries to a new .compacting log file. + * Phase 2: Flush .compacting log to disk and it becomes .compacted log file when this completes. + * Phase 3: Flush ledger cache and .compacted file becomes .log file when this completes. Remove old + * entry log file afterwards. + */ +public class TransactionalEntryLogCompactor extends AbstractLogCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(TransactionalEntryLogCompactor.class); + + final EntryLogger entryLogger; + final CompactableLedgerStorage ledgerStorage; + final List<EntryLocation> offsets = new ArrayList<>(); + + // compaction log file suffix + static final String COMPACTING_SUFFIX = ".log.compacting"; + // flushed compaction log file suffix + static final String COMPACTED_SUFFIX = ".compacted"; + + public TransactionalEntryLogCompactor(GarbageCollectorThread gcThread) { + super(gcThread); + this.entryLogger = gcThread.getEntryLogger(); + this.ledgerStorage = gcThread.getLedgerStorage(); + } + + /** + * Delete all previously incomplete compacting logs and recover the index for compacted logs. + */ + @Override + public void cleanUpAndRecover() { + // clean up compacting logs and recover index for already compacted logs + List<File> ledgerDirs = entryLogger.getLedgerDirsManager().getAllLedgerDirs(); + for (File dir : ledgerDirs) { + File[] compactingPhaseFiles = dir.listFiles(file -> file.getName().endsWith(COMPACTING_SUFFIX)); + if (compactingPhaseFiles != null) { + for (File file : compactingPhaseFiles) { + if (file.delete()) { + LOG.info("Deleted failed compaction file {}", file); + } + } + } + File[] compactedPhaseFiles = dir.listFiles(file -> file.getName().endsWith(COMPACTED_SUFFIX)); + if (compactedPhaseFiles != null) { + for (File compactedFile : compactedPhaseFiles) { + LOG.info("Found compacted log file {} has partially flushed index, recovering index.", compactedFile); + CompactionPhase updateIndex = new UpdateIndexPhase(compactedFile, true); + updateIndex.run(); + } + } + } + } + + @Override + public boolean compact(EntryLogMetadata metadata) { + if (metadata != null) { + LOG.info("Compacting entry log {} with usage {}.", + new Object[]{metadata.getEntryLogId(), metadata.getUsage()}); + CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata); + if (!scanEntryLog.run()) { + LOG.info("Compaction for entry log {} end in ScanEntryLogPhase.", metadata.getEntryLogId()); + return false; + } + File compactionLogFile = entryLogger.getCurCompactionLogFile(); + CompactionPhase flushCompactionLog = new FlushCompactionLogPhase(metadata.getEntryLogId()); + if (!flushCompactionLog.run()) { + LOG.info("Compaction for entry log {} end in FlushCompactionLogPhase.", metadata.getEntryLogId()); + return false; + } + File compactedLogFile = getCompactedLogFile(compactionLogFile, metadata.getEntryLogId()); + CompactionPhase updateIndex = new UpdateIndexPhase(compactedLogFile); + if (!updateIndex.run()) { + LOG.info("Compaction for entry log {} end in UpdateIndexPhase.", metadata.getEntryLogId()); + return false; + } + LOG.info("Compacted entry log : {}.", metadata.getEntryLogId()); + return true; + } + return false; + } + + /** + * An abstract class that would be extended to be the actual transactional phases for compaction + */ + abstract static class CompactionPhase { + private String phaseName = ""; + + CompactionPhase(String phaseName) { + this.phaseName = phaseName; + } + + boolean run() { + try { + start(); + return complete(); + } catch (IOException e) { + LOG.error("Encounter exception in compaction phase {}. Abort current compaction.", phaseName, e); + abort(); + } + return false; + } + + abstract void start() throws IOException; + + abstract boolean complete() throws IOException; + + abstract void abort(); + + } + + /** + * Assume we're compacting entry log 1 to entry log 3. + * The first phase is to scan entries in 1.log and copy them to compaction log file "3.log.compacting". + * We'll try to allocate a new compaction log before scanning to make sure we have a log file to write. + * If after scanning, there's no data written, it means there's no valid entries to be compacted, + * so we can remove 1.log directly, clear the offsets and end the compaction. + * Otherwise, we should move on to the next phase. + * <p> + * If anything failed in this phase, we should delete the compaction log and clean the offsets. + */ + class ScanEntryLogPhase extends CompactionPhase { + private final EntryLogMetadata metadata; + + ScanEntryLogPhase(EntryLogMetadata metadata) { + super("ScanEntryLogPhase"); + this.metadata = metadata; + } + + @Override + void start() throws IOException { + // scan entry log into compaction log and offset list + entryLogger.createNewCompactionLog(); + entryLogger.scanEntryLog(metadata.getEntryLogId(), new EntryLogScanner() { + @Override + public boolean accept(long ledgerId) { + return metadata.containsLedger(ledgerId); + } + + @Override + public void process(long ledgerId, long offset, ByteBuffer entry) throws IOException { + throttler.acquire(entry.remaining()); + synchronized (TransactionalEntryLogCompactor.this) { + long lid = entry.getLong(); + long entryId = entry.getLong(); + if (lid != ledgerId || entryId < -1) { + LOG.warn("Scanning expected ledgerId {}, but found invalid entry " + + "with ledgerId {} entryId {} at offset {}", + new Object[]{ledgerId, lid, entryId, offset}); + throw new IOException("Invalid entry found @ offset " + offset); + } + entry.rewind(); + long newOffset = entryLogger.addEntryForCompaction(ledgerId, entry); + offsets.add(new EntryLocation(ledgerId, entryId, newOffset)); + + if (LOG.isDebugEnabled()) { + LOG.debug("Compact add entry : lid = {}, eid = {}, offset = {}", + new Object[]{ledgerId, entryId, newOffset}); + } + } + } + }); + } + + @Override + boolean complete() { + if (offsets.isEmpty()) { + // no valid entries is compacted, delete entry log file + LOG.info("No valid entry is found in entry log after scan, removing entry log now."); + gcThread.removeEntryLog(metadata.getEntryLogId()); + entryLogger.removeCurCompactionLog(); + return false; + } + return true; + } + + @Override + void abort() { + offsets.clear(); + // since we haven't flushed yet, we only need to delete the unflushed compaction file. + entryLogger.removeCurCompactionLog(); + } + + } + + /** + * Assume we're compacting log 1 to log 3. + * This phase is to flush the compaction log. + * When this phase starts, there should be a compaction log file like "3.log.compacting" + * When compaction log is flushed, in order to indicate this phase is completed, + * a hardlink file "3.log.1.compacted" should be created, and "3.log.compacting" should be deleted. + */ + class FlushCompactionLogPhase extends CompactionPhase { + private final long compactingLogId; + private File compactedLogFile; + + FlushCompactionLogPhase(long compactingLogId) { + super("FlushCompactionLogPhase"); + this.compactingLogId = compactingLogId; + } + + @Override + void start() throws IOException { + // flush the current compaction log. + File compactionLogFile = entryLogger.getCurCompactionLogFile(); + if (compactionLogFile == null || !compactionLogFile.exists()) { + throw new IOException("Compaction log doesn't exist during flushing"); + } + entryLogger.flushCompactionLog(); + } + + @Override + boolean complete() throws IOException { + // create a hard link file named "x.log.y.compacted" for file "x.log.compacting". + // where x is compactionLogId and y is compactingLogId. + File compactionLogFile = entryLogger.getCurCompactionLogFile(); + if (compactionLogFile == null || !compactionLogFile.exists()) { + LOG.warn("Compaction log doesn't exist any more after flush"); + return false; + } + compactedLogFile = getCompactedLogFile(compactionLogFile, compactingLogId); + if (compactedLogFile != null && !compactedLogFile.exists()) { + HardLink.createHardLink(compactionLogFile, compactedLogFile); + } + entryLogger.removeCurCompactionLog(); + return true; + } + + @Override + void abort() { + offsets.clear(); + // remove compaction log file and its hardlink + entryLogger.removeCurCompactionLog(); + if (compactedLogFile != null && compactedLogFile.exists()) { + if (!compactedLogFile.delete()) { + LOG.warn("Could not delete compacted log file {}", compactedLogFile); + } + } + } + } + + /** + * Assume we're compacting log 1 to log 3. + * This phase is to update the entry locations and flush the index. + * When the phase start, there should be a compacted file like "3.log.1.compacted", + * where 3 is the new compaction logId and 1 is the old entry logId. + * After the index the flushed successfully, a hardlink "3.log" file should be created, + * and 3.log.1.compacted file should be deleted to indicate the phase is succeed. + * <p> + * This phase can also used to recover partially flushed index when we pass isInRecovery=true + */ + class UpdateIndexPhase extends CompactionPhase { + File compactedLogFile; + File newEntryLogFile; + private final boolean isInRecovery; + + public UpdateIndexPhase(File compactedLogFile) { + this(compactedLogFile, false); + } + + public UpdateIndexPhase(File compactedLogFile, boolean isInRecovery) { + super("UpdateIndexPhase"); + this.compactedLogFile = compactedLogFile; + this.isInRecovery = isInRecovery; + } + + @Override + void start() throws IOException { + if (compactedLogFile != null && compactedLogFile.exists()) { + File dir = compactedLogFile.getParentFile(); + String compactedFilename = compactedLogFile.getName(); + // create a hard link "x.log" for file "x.log.y.compacted" + this.newEntryLogFile = new File(dir, compactedFilename.substring(0, compactedFilename.indexOf(".log") + 4)); + if (!newEntryLogFile.exists()) { + HardLink.createHardLink(compactedLogFile, newEntryLogFile); + } + if (isInRecovery) { + recoverEntryLocations(EntryLogger.fileName2LogId(newEntryLogFile.getName())); + } + if (!offsets.isEmpty()) { + // update entry locations and flush index + ledgerStorage.updateEntriesLocations(offsets); + ledgerStorage.flushEntriesLocationsIndex(); + } + } else { + throw new IOException("Failed to find compacted log file in UpdateIndexPhase"); + } + } + + @Override + boolean complete() { + // When index is flushed, and entry log is removed, + // delete the ".compacted" file to indicate this phase is completed. + offsets.clear(); + if (compactedLogFile != null) { + if (!compactedLogFile.delete()) { + LOG.warn("Could not delete compacted log file {}", compactedLogFile); + } + // Now delete the old entry log file since it's compacted + String compactedFilename = compactedLogFile.getName(); + String oldEntryLogFilename = compactedFilename.substring(compactedFilename.indexOf(".log") + 5); + long entryLogId = EntryLogger.fileName2LogId(oldEntryLogFilename); + gcThread.removeEntryLog(entryLogId); + } + return true; + } + + @Override + void abort() { + offsets.clear(); + } + + /** + * Scan entry log to recover entry locations. + */ + private void recoverEntryLocations(long compactedLogId) throws IOException { + entryLogger.scanEntryLog(compactedLogId, new EntryLogScanner() { + @Override + public boolean accept(long ledgerId) { + return true; + } + + @Override + public void process(long ledgerId, long offset, ByteBuffer entry) throws IOException { + long lid = entry.getLong(); + long entryId = entry.getLong(); + if (lid != ledgerId || entryId < -1) { + LOG.warn("Scanning expected ledgerId {}, but found invalid entry " + + "with ledgerId {} entryId {} at offset {}", + new Object[]{ledgerId, lid, entryId, offset}); + throw new IOException("Invalid entry found @ offset " + offset); + } + entry.rewind(); + long location = (compactedLogId << 32L) | (offset + 4); + offsets.add(new EntryLocation(lid, entryId, location)); + } + }); + LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactedLogId); + } + } + + File getCompactedLogFile(File compactionLogFile, long compactingLogId) { + if (compactionLogFile == null) { + return null; + } + File dir = compactionLogFile.getParentFile(); + String filename = compactionLogFile.getName(); + String newSuffix = ".log." + EntryLogger.logId2HexString(compactingLogId) + COMPACTED_SUFFIX; + String hardLinkFilename = filename.replace(COMPACTING_SUFFIX, newSuffix); + return new File(dir, hardLinkFilename); + } + +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index fea051c..8dcb1b3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -56,6 +56,7 @@ public class ServerConfiguration extends AbstractConfiguration { protected final static String GC_WAIT_TIME = "gcWaitTime"; protected final static String IS_FORCE_GC_ALLOW_WHEN_NO_SPACE = "isForceGCAllowWhenNoSpace"; protected final static String GC_OVERREPLICATED_LEDGER_WAIT_TIME = "gcOverreplicatedLedgerWaitTime"; + protected final static String USE_TRANSACTIONAL_COMPACTION = "useTransactionalCompaction"; // Sync Parameters protected final static String FLUSH_INTERVAL = "flushInterval"; protected final static String FLUSH_ENTRYLOG_INTERVAL_BYTES = "flushEntrylogBytes"; @@ -292,6 +293,25 @@ public class ServerConfiguration extends AbstractConfiguration { } /** + * Get whether to use transactional compaction and using a separate log for compaction or not. + * + * @return use transactional compaction + */ + public boolean getUseTransactionalCompaction() { + return this.getBoolean(USE_TRANSACTIONAL_COMPACTION, false); + } + + /** + * Set whether to use transactional compaction and using a separate log for compaction or not. + * @param useTransactionalCompaction + * @return server configuration + */ + public ServerConfiguration setUseTransactionalCompaction(boolean useTransactionalCompaction) { + this.setProperty(USE_TRANSACTIONAL_COMPACTION, useTransactionalCompaction); + return this; + } + + /** * Get flush interval. Default value is 10 second. It isn't useful to decrease * this value, since ledger storage only checkpoints when an entry logger file * is rolled. diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java index af8f1a3..cea3b52 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java @@ -32,6 +32,8 @@ import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.util.DiskChecker; import org.apache.bookkeeper.util.TestUtils; @@ -132,9 +134,21 @@ public class BookieStorageThresholdTest extends BookKeeperClusterTestCase { @Test public void testStorageThresholdCompaction() throws Exception { - - // We are having BKCluster with just one bookie (NUM_BOOKIES = 1). - Bookie bookie = bs.get(0).getBookie(); + stopAllBookies(); + ServerConfiguration conf = newServerConfiguration(); + File ledgerDir1 = createTempDir("ledger", "test1"); + File ledgerDir2 = createTempDir("ledger","test2"); + File journalDir = createTempDir("journal","test"); + String[] ledgerDirNames = new String[]{ + ledgerDir1.getPath(), + ledgerDir2.getPath() + }; + conf.setLedgerDirNames(ledgerDirNames); + conf.setJournalDirName(journalDir.getPath()); + BookieServer server = startBookie(conf); + bs.add(server); + bsConfs.add(conf); + Bookie bookie = server.getBookie(); // since we are going to set dependency injected ledgermonitor, so we need to shutdown // the ledgermonitor which was created as part of the initialization of Bookie bookie.ledgerMonitor.shutdown(); @@ -197,14 +211,9 @@ public class BookieStorageThresholdTest extends BookKeeperClusterTestCase { bkc.deleteLedger(lhs[1].getId()); bkc.deleteLedger(lhs[2].getId()); - // since compaction intervals are too long, there is no possibility for compaction to get kicked in - // so all the entrylogs (0,1,2) should be available in the ledgerdirectory - assertTrue("All the entry log files ([0,1,2].log are not available, which is not expected" - + tmpDirs.get(0).getAbsolutePath(), TestUtils.hasLogFiles(tmpDirs.get(0), false, 0, 1, 2)); // validating that LedgerDirsListener are not triggered yet assertTrue("Disk Full shouldn't have been triggered yet", diskFull.getCount() == 1); assertTrue("Disk writable shouldn't have been triggered yet", diskWritable.getCount() == 1); - // set exception injection to true, so that next time when checkDir of DiskChecker (ThresholdTestDiskChecker) is // called it will throw DiskOutOfSpaceException thresholdTestDiskChecker.setInjectDiskOutOfSpaceException(true); @@ -226,9 +235,10 @@ public class BookieStorageThresholdTest extends BookKeeperClusterTestCase { // force GC. // Because of getWritableLedgerDirsForNewLog, compaction would be able to create newlog and compact even though // there are no writableLedgerDirs - assertFalse( - "Found entry log file ([0,1,2].log. They should have been compacted" + tmpDirs.get(0).getAbsolutePath(), - TestUtils.hasLogFiles(tmpDirs.get(0), true, 0, 1, 2)); + for(File ledgerDir : bookie.getLedgerDirsManager().getAllLedgerDirs()) { + assertFalse("Found entry log file ([0,1,2].log. They should have been compacted" + ledgerDir, + TestUtils.hasLogFiles(ledgerDir.getParentFile(), true, 0, 1, 2)); + } try { ledgerDirsManager.getWritableLedgerDirs(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java index 57b9404..17cc04f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java @@ -25,6 +25,8 @@ import io.netty.buffer.Unpooled; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -34,8 +36,6 @@ import java.util.Enumeration; import java.util.List; import org.apache.bookkeeper.client.BookKeeper.DigestType; -import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; -import org.apache.bookkeeper.bookie.GarbageCollectorThread.CompactionScannerFactory; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; @@ -51,17 +51,18 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataLis import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.util.DiskChecker; +import org.apache.bookkeeper.util.HardLink; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.TestUtils; import org.apache.bookkeeper.versioning.Version; import org.apache.zookeeper.AsyncCallback; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTED_SUFFIX; import static org.junit.Assert.*; /** * This class tests the entry log compaction functionality. @@ -356,6 +357,31 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase { @Test public void testMinorCompactionWithNoWritableLedgerDirsButIsForceGCAllowWhenNoSpaceIsSet() throws Exception { + stopAllBookies(); + ServerConfiguration conf = newServerConfiguration(); + // disable major compaction + conf.setMajorCompactionThreshold(0.0f); + // here we are setting isForceGCAllowWhenNoSpace to true, so Major and Minor compaction wont be disabled in case + // when discs are full + conf.setIsForceGCAllowWhenNoSpace(true); + conf.setGcWaitTime(600000); + conf.setMinorCompactionInterval(120000); + conf.setMajorCompactionInterval(240000); + // We need at least 2 ledger dirs because compaction will flush ledger cache, and will + // trigger relocateIndexFileAndFlushHeader. If we only have one ledger dir, compaction will always fail + // when there's no writeable ledger dir. + File ledgerDir1 = createTempDir("ledger", "test1"); + File ledgerDir2 = createTempDir("ledger","test2"); + File journalDir = createTempDir("journal","test"); + String[] ledgerDirNames = new String[]{ + ledgerDir1.getPath(), + ledgerDir2.getPath() + }; + conf.setLedgerDirNames(ledgerDirNames); + conf.setJournalDirName(journalDir.getPath()); + BookieServer server = startBookie(conf); + bs.add(server); + bsConfs.add(conf); // prepare data LedgerHandle[] lhs = prepareData(3, false); @@ -363,19 +389,6 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase { lh.close(); } - // disable major compaction - baseConf.setMajorCompactionThreshold(0.0f); - - // here we are setting isForceGCAllowWhenNoSpace to true, so Major and Minor compaction wont be disabled in case - // when discs are full - baseConf.setIsForceGCAllowWhenNoSpace(true); - baseConf.setGcWaitTime(60000); - baseConf.setMinorCompactionInterval(120000); - baseConf.setMajorCompactionInterval(240000); - - // restart bookies - restartBookies(baseConf); - long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; assertFalse(getGCThread().enableMajorCompaction); @@ -383,6 +396,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase { for (BookieServer bookieServer : bs) { Bookie bookie = bookieServer.getBookie(); + bookie.ledgerStorage.flush(); + bookie.ledgerMonitor.shutdown(); LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager(); List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs(); // Major and Minor compaction are not disabled even though discs are full. Check LedgerDirsListener of @@ -407,9 +422,9 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase { // allocating newlog // we get getWritableLedgerDirsForNewLog() of ledgerDirsManager instead of getWritableLedgerDirs() // entry logs ([0,1,2].log) should be compacted. - for (File ledgerDirectory : tmpDirs) { + for (File ledgerDirectory : server.getBookie().getLedgerDirsManager().getAllLedgerDirs()) { assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: " - + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2)); + + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory.getParentFile(), true, 0, 1, 2)); } // even entry log files are removed, we still can access entries for ledger1 @@ -817,13 +832,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase { } @Test - public void testCompactionWithEntryLogRollover() throws Exception { - // Disable bookie gc during this test - baseConf.setGcWaitTime(60000); - baseConf.setMinorCompactionInterval(0); - baseConf.setMajorCompactionInterval(0); - restartBookies(baseConf); - + public void testRecoverIndexWhenIndexIsPartiallyFlush() throws Exception { // prepare data LedgerHandle[] lhs = prepareData(3, false); @@ -831,46 +840,249 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase { lh.close(); } + // disable compaction + baseConf.setMinorCompactionThreshold(0.0f); + baseConf.setMajorCompactionThreshold(0.0f); + baseConf.setGcWaitTime(600000); + + // restart bookies + restartBookies(baseConf); + + Bookie bookie = bs.get(0).getBookie(); + InterleavedLedgerStorage storage = (InterleavedLedgerStorage) bookie.ledgerStorage; + // remove ledger2 and ledger3 bkc.deleteLedger(lhs[1].getId()); bkc.deleteLedger(lhs[2].getId()); + LOG.info("Finished deleting the ledgers contains most entries."); - InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) bs.get(0).getBookie().ledgerStorage; - GarbageCollectorThread garbageCollectorThread = ledgerStorage.gcThread; - CompactionScannerFactory compactionScannerFactory = garbageCollectorThread.scannerFactory; - long entryLogId = 0; - EntryLogger entryLogger = ledgerStorage.entryLogger; + MockTransactionalEntryLogCompactor partialCompactionWorker = new MockTransactionalEntryLogCompactor( + ((InterleavedLedgerStorage) bookie.ledgerStorage).gcThread); + + for (long logId = 0; logId < 3; logId++) { + EntryLogMetadata meta = storage.entryLogger.getEntryLogMetadata(logId); + partialCompactionWorker.compactWithIndexFlushFailure(meta); + } + + // entry logs ([0,1,2].log) should not be compacted because of partial flush throw IOException + for (File ledgerDirectory : tmpDirs) { + assertTrue("Entry log file ([0,1,2].log should not be compacted in ledgerDirectory: " + + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2)); + } - LOG.info("Before compaction -- Least unflushed log id: {}", entryLogger.getLeastUnflushedLogId()); + // entries should be available + verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); - // Compact entryLog 0 - EntryLogScanner scanner = compactionScannerFactory.newScanner(entryLogger.getEntryLogMetadata(entryLogId)); + // But we should see .compacted file with index flush failure + assertEquals(findCompactedEntryLogFiles().size(), 3); - entryLogger.scanEntryLog(entryLogId, scanner); + // Now try to recover those flush failed index files + partialCompactionWorker.cleanUpAndRecover(); - long entryLogIdAfterCompaction = entryLogger.getLeastUnflushedLogId(); - LOG.info("After compaction -- Least unflushed log id: {}", entryLogIdAfterCompaction); + // There should be no .compacted files after recovery + assertEquals(findCompactedEntryLogFiles().size(), 0); + + // compaction worker should recover partial flushed index and delete [0,1,2].log + for (File ledgerDirectory : tmpDirs) { + assertFalse("Entry log file ([0,1,2].log should have been compacted in ledgerDirectory: " + + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2)); + } + + // even entry log files are removed, we still can access entries for ledger1 + // since those entries has been compacted to new entry log + verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); + } - // Add more entries to trigger entrylog roll over - LedgerHandle[] lhs2 = prepareData(3, false); + @Test + public void testCompactionFailureShouldNotResultInDuplicatedData() throws Exception { + // prepare data + LedgerHandle[] lhs = prepareData(5, false); - for (LedgerHandle lh : lhs2) { + for (LedgerHandle lh : lhs) { lh.close(); } - // Wait for entry logger to move forward - while (entryLogger.getLeastUnflushedLogId() <= entryLogIdAfterCompaction) { - Thread.sleep(100); + // disable compaction + baseConf.setMinorCompactionThreshold(0.0f); + baseConf.setMajorCompactionThreshold(0.0f); + baseConf.setUseTransactionalCompaction(true); + + // restart bookies + restartBookies(baseConf); + + // remove ledger2 and ledger3 + bkc.deleteLedger(lhs[1].getId()); + bkc.deleteLedger(lhs[2].getId()); + + LOG.info("Finished deleting the ledgers contains most entries."); + Thread.sleep(baseConf.getMajorCompactionInterval() * 1000 + + baseConf.getGcWaitTime()); + Bookie bookie = bs.get(0).getBookie(); + InterleavedLedgerStorage storage = (InterleavedLedgerStorage) bookie.ledgerStorage; + + List<File> ledgerDirs = bookie.getLedgerDirsManager().getAllLedgerDirs(); + List<Long> usageBeforeCompaction = new ArrayList<>(); + ledgerDirs.forEach(file -> usageBeforeCompaction.add(getDirectorySpaceUsage(file))); + + MockTransactionalEntryLogCompactor partialCompactionWorker = new MockTransactionalEntryLogCompactor( + ((InterleavedLedgerStorage) bookie.ledgerStorage).gcThread); + + for (long logId = 0; logId < 5; logId++) { + EntryLogMetadata meta = storage.entryLogger.getEntryLogMetadata(logId); + partialCompactionWorker.compactWithLogFlushFailure(meta); + } + + // entry logs ([0-4].log) should not be compacted because of failure in flush compaction log + for (File ledgerDirectory : tmpDirs) { + assertTrue("Entry log file ([0,1,2].log should not be compacted in ledgerDirectory: " + + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2, 3, 4)); + } + // even entry log files are removed, we still can access entries for ledger1 + // since those entries has been compacted to new entry log + verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); + + List<Long> freeSpaceAfterCompactionFailed = new ArrayList<>(); + ledgerDirs.forEach(file -> freeSpaceAfterCompactionFailed.add(getDirectorySpaceUsage(file))); + + // No extra data is generated after compaction fail + for (int i = 0; i < usageBeforeCompaction.size(); i++) { + assertEquals(usageBeforeCompaction.get(i), freeSpaceAfterCompactionFailed.get(i)); } - long entryLogIdBeforeFlushing = entryLogger.getLeastUnflushedLogId(); - LOG.info("Added more data -- Least unflushed log id: {}", entryLogIdBeforeFlushing); + // now enable normal compaction + baseConf.setMajorCompactionThreshold(0.5f); - Assert.assertTrue(entryLogIdAfterCompaction < entryLogIdBeforeFlushing); + // restart bookies + restartBookies(baseConf); - // Wait for entries to be flushed on entry logs and update index - // This operation should succeed even if the entry log rolls over after the last entry was compacted - compactionScannerFactory.flush(); + Thread.sleep(baseConf.getMajorCompactionInterval() * 1000 + + baseConf.getGcWaitTime()); + // compaction worker should compact [0-4].log + for (File ledgerDirectory : tmpDirs) { + assertFalse("Entry log file ([0,1,2].log should have been compacted in ledgerDirectory: " + + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2, 3, 4)); + } + + // even entry log files are removed, we still can access entries for ledger1 + // since those entries has been compacted to new entry log + verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); + } + + private long getDirectorySpaceUsage(File dir) { + long size = 0; + for (File file : dir.listFiles()) { + size += file.length(); + } + return size; } + + private Set<File> findCompactedEntryLogFiles() { + Set<File> compactedLogFiles = new HashSet<>(); + for (File ledgerDirectory : tmpDirs) { + File[] files = Bookie.getCurrentDirectory(ledgerDirectory).listFiles( + file -> file.getName().endsWith(COMPACTED_SUFFIX)); + if (files != null) { + for (File file : files) { + compactedLogFiles.add(file); + } + } + } + return compactedLogFiles; + } + + private static class MockTransactionalEntryLogCompactor extends TransactionalEntryLogCompactor { + + public MockTransactionalEntryLogCompactor(GarbageCollectorThread gcThread) { + super(gcThread); + } + + synchronized void compactWithIndexFlushFailure(EntryLogMetadata metadata) { + LOG.info("Compacting entry log {}.", metadata.getEntryLogId()); + CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata); + if (!scanEntryLog.run()) { + LOG.info("Compaction for {} end in ScanEntryLogPhase.", metadata.getEntryLogId()); + return; + } + File compactionLogFile = entryLogger.getCurCompactionLogFile(); + CompactionPhase flushCompactionLog = new FlushCompactionLogPhase(metadata.getEntryLogId()); + if (!flushCompactionLog.run()) { + LOG.info("Compaction for {} end in FlushCompactionLogPhase.", metadata.getEntryLogId()); + return; + } + File compactedLogFile = getCompactedLogFile(compactionLogFile, metadata.getEntryLogId()); + CompactionPhase partialFlushIndexPhase = new PartialFlushIndexPhase(compactedLogFile); + if (!partialFlushIndexPhase.run()) { + LOG.info("Compaction for {} end in PartialFlushIndexPhase.", metadata.getEntryLogId()); + return; + } + gcThread.removeEntryLog(metadata.getEntryLogId()); + LOG.info("Compacted entry log : {}.", metadata.getEntryLogId()); + } + + synchronized void compactWithLogFlushFailure(EntryLogMetadata metadata) { + LOG.info("Compacting entry log {}", metadata.getEntryLogId()); + CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata); + if (!scanEntryLog.run()) { + LOG.info("Compaction for {} end in ScanEntryLogPhase.", metadata.getEntryLogId()); + return; + } + File compactionLogFile = entryLogger.getCurCompactionLogFile(); + CompactionPhase logFlushFailurePhase = new LogFlushFailurePhase(metadata.getEntryLogId()); + if (!logFlushFailurePhase.run()) { + LOG.info("Compaction for {} end in FlushCompactionLogPhase.", metadata.getEntryLogId()); + return; + } + File compactedLogFile = getCompactedLogFile(compactionLogFile, metadata.getEntryLogId()); + CompactionPhase updateIndex = new UpdateIndexPhase(compactedLogFile); + if (!updateIndex.run()) { + LOG.info("Compaction for entry log {} end in UpdateIndexPhase.", metadata.getEntryLogId()); + return; + } + gcThread.removeEntryLog(metadata.getEntryLogId()); + LOG.info("Compacted entry log : {}.", metadata.getEntryLogId()); + } + + private class PartialFlushIndexPhase extends UpdateIndexPhase { + + public PartialFlushIndexPhase(File compactedLogFile) { + super(compactedLogFile); + } + + @Override + void start() throws IOException { + if (compactedLogFile != null && compactedLogFile.exists()) { + File dir = compactedLogFile.getParentFile(); + String compactedFilename = compactedLogFile.getName(); + // create a hard link "x.log" for file "x.log.y.compacted" + this.newEntryLogFile = new File(dir, compactedFilename.substring(0, compactedFilename.indexOf(".log") + 4)); + File hardlinkFile = new File(dir, newEntryLogFile.getName()); + if (!hardlinkFile.exists()) { + HardLink.createHardLink(compactedLogFile, hardlinkFile); + } + assertTrue(offsets.size() > 1); + // only flush index for one entry location + EntryLocation el = offsets.get(0); + ledgerStorage.updateEntriesLocations(offsets); + ledgerStorage.flushEntriesLocationsIndex(); + throw new IOException("Flush ledger index encounter exception"); + } + } + } + + private class LogFlushFailurePhase extends FlushCompactionLogPhase { + + LogFlushFailurePhase(long compactingLogId) { + super(compactingLogId); + } + + @Override + void start() throws IOException { + // flush the current compaction log + entryLogger.flushCompactionLog(); + throw new IOException("Encounter IOException when trying to flush compaction log"); + } + } + } + } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index f5a30f3..b5a22f8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -241,7 +241,12 @@ public abstract class BookKeeperClusterTestCase { for (BookieServer server : bs) { server.shutdown(); } + bsConfs.clear(); bs.clear(); + if (bkc != null) { + bkc.close(); + bkc = null; + } } protected void startAllBookies() throws Exception { @@ -598,6 +603,10 @@ public abstract class BookKeeperClusterTestCase { }; server.start(); + if (bkc == null) { + bkc = new BookKeeperTestClient(baseClientConf); + } + int port = conf.getBookiePort(); String host = InetAddress.getLocalHost().getHostAddress(); if (conf.getUseHostNameAsBookieID()) { -- To stop receiving notification emails like this one, please contact ['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].