Author: sijie Date: Tue Oct 22 05:44:13 2013 New Revision: 1534503 URL: http://svn.apache.org/r1534503 Log: BOOKKEEPER-664: Compaction increases latency on journal writes (ivank via sijie)
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1534503&r1=1534502&r2=1534503&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/CHANGES.txt (original) +++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Oct 22 05:44:13 2013 @@ -176,6 +176,8 @@ Trunk (unreleased changes) BOOKKEEPER-657: Journal Improvement (Robin Dhamankar via sijie) + BOOKKEEPER-664: Compaction increases latency on journal writes (ivank via sijie) + NEW FEATURE: BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj) Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf?rev=1534503&r1=1534502&r2=1534503&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf Tue Oct 22 05:44:13 2013 @@ -91,6 +91,19 @@ ledgerDirectories=/tmp/bk-data # If it is set to less than zero, the major compaction is disabled. # majorCompactionInterval=86400 +# Set the maximum number of entries which can be compacted without flushing. +# When compacting, the entries are written to the entrylog and the new offsets +# are cached in memory. Once the entrylog is flushed the index is updated with +# the new offsets. This parameter controls the number of entries added to the +# entrylog before a flush is forced. A higher value for this parameter means +# more memory will be used for offsets. Each offset consists of 3 longs. +# This parameter should _not_ be modified unless you know what you're doing. +# The default is 100,000. +#compactionMaxOutstandingRequests=100000 + +# Set the rate at which compaction will readd entries. The unit is adds per second. +#compactionRate=1000 + # Max file size of journal file, in mega bytes # A new journal file will be created when the old one reaches the file size limitation # Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1534503&r1=1534502&r2=1534503&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Tue Oct 22 05:44:13 2013 @@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.bookkeeper.bookie.GarbageCollectorThread.SafeEntryAdder; import org.apache.bookkeeper.bookie.Journal.JournalScanner; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; @@ -420,8 +419,7 @@ public class Bookie extends BookieThread // instantiate the journal journal = new Journal(conf, ledgerDirsManager); ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager, - ledgerDirsManager, journal, - new BookieSafeEntryAdder()); + ledgerDirsManager, journal); syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, journal); @@ -1087,37 +1085,6 @@ public class Bookie extends BookieThread return true; } - private class BookieSafeEntryAdder implements SafeEntryAdder { - @Override - public void safeAddEntry(final long ledgerId, final ByteBuffer buffer, - final GenericCallback<Void> cb) { - journal.logAddEntry(buffer, new WriteCallback() { - @Override - public void writeComplete(int rc, long ledgerId2, long entryId, - InetSocketAddress addr, Object ctx) { - if (rc != BookieException.Code.OK) { - LOG.error("Error rewriting to journal (ledger {}, entry {})", ledgerId2, entryId); - cb.operationComplete(rc, null); - return; - } - try { - addEntryByLedgerId(ledgerId, buffer); - cb.operationComplete(rc, null); - } catch (IOException ioe) { - LOG.error("Error adding to ledger storage (ledger " + ledgerId2 - + ", entry " + entryId + ")", ioe); - // couldn't add to ledger storage - cb.operationComplete(BookieException.Code.IllegalOpException, null); - } catch (BookieException bke) { - LOG.error("Bookie error adding to ledger storage (ledger " + ledgerId2 - + ", entry " + entryId + ")", bke); - // couldn't add to ledger storage - cb.operationComplete(bke.getCode(), null); - } - } - }, null); - } - } /** * @param args * @throws IOException Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1534503&r1=1534502&r2=1534503&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Tue Oct 22 05:44:13 2013 @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.conf.ServerConfiguration; @@ -73,7 +74,8 @@ public class EntryLogger { final long logSizeLimit; private List<BufferedChannel> logChannelsToFlush; private volatile BufferedChannel logChannel; - private final EntryLogListener listener; + private final CopyOnWriteArrayList<EntryLogListener> listeners + = new CopyOnWriteArrayList<EntryLogListener>(); /** * The 1K block at the head of the entry logger file @@ -136,7 +138,9 @@ public class EntryLogger { LedgerDirsManager ledgerDirsManager, EntryLogListener listener) throws IOException { this.ledgerDirsManager = ledgerDirsManager; - this.listener = listener; + if (listener != null) { + addListener(listener); + } // log size limit this.logSizeLimit = conf.getEntryLogSizeLimit(); @@ -163,6 +167,12 @@ public class EntryLogger { initialize(); } + void addListener(EntryLogListener listener) { + if (null != listener) { + listeners.add(listener); + } + } + /** * Maps entry log files to open channels. */ @@ -236,7 +246,7 @@ public class EntryLogger { // so the readers could access the data from filesystem. logChannel.flush(false); logChannelsToFlush.add(logChannel); - if (null != listener) { + for (EntryLogListener listener : listeners) { listener.onRotateEntryLog(); } } @@ -432,12 +442,16 @@ public class EntryLogger { return (logId << 32L) | pos; } + static long logIdForOffset(long offset) { + return offset >> 32L; + } + synchronized boolean reachEntryLogLimit(long size) { return logChannel.position() + size > logSizeLimit; } byte[] readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException { - long entryLogId = location >> 32L; + long entryLogId = logIdForOffset(location); long pos = location & 0xffffffffL; ByteBuffer sizeBuff = ByteBuffer.allocate(4); pos -= 4; // we want to get the ledgerId and length to check Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1534503&r1=1534502&r2=1534503&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Tue Oct 22 05:44:13 2013 @@ -30,9 +30,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import com.google.common.util.concurrent.RateLimiter; + import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner; import org.apache.bookkeeper.conf.ServerConfiguration; @@ -48,7 +48,6 @@ import org.slf4j.LoggerFactory; */ public class GarbageCollectorThread extends Thread { private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThread.class); - private static final int COMPACTION_MAX_OUTSTANDING_REQUESTS = 1000; private static final int SECOND = 1000; // Maps entry log files to the set of ledgers that comprise the file and the size usage per ledger @@ -69,9 +68,12 @@ public class GarbageCollectorThread exte long lastMinorCompactionTime; long lastMajorCompactionTime; + final int maxOutstandingRequests; + final int compactionRate; + final CompactionScannerFactory scannerFactory; + // Entry Logger Handle final EntryLogger entryLogger; - final SafeEntryAdder safeEntryAdder; // Ledger Cache Handle final LedgerCache ledgerCache; @@ -89,77 +91,90 @@ public class GarbageCollectorThread exte final GarbageCollector garbageCollector; final GarbageCleaner garbageCleaner; - - /** - * Interface for adding entries. When the write callback is triggered, the - * entry must be guaranteed to be presisted. - */ - interface SafeEntryAdder { - public void safeAddEntry(long ledgerId, ByteBuffer buffer, GenericCallback<Void> cb); + private static class Offset { + final long ledger; + final long entry; + final long offset; + + Offset(long ledger, long entry, long offset) { + this.ledger = ledger; + this.entry = entry; + this.offset = offset; + } } /** * A scanner wrapper to check whether a ledger is alive in an entry log file */ - class CompactionScanner implements EntryLogScanner { - EntryLogMetadata meta; - Object completionLock = new Object(); - AtomicInteger outstandingRequests = new AtomicInteger(0); - AtomicBoolean allSuccessful = new AtomicBoolean(true); + class CompactionScannerFactory implements EntryLogger.EntryLogListener { + List<Offset> offsets = new ArrayList<Offset>(); - public CompactionScanner(EntryLogMetadata meta) { - this.meta = meta; - } + EntryLogScanner newScanner(final EntryLogMetadata meta) { + final RateLimiter rateLimiter = RateLimiter.create(compactionRate); + return new EntryLogScanner() { + @Override + public boolean accept(long ledgerId) { + return meta.containsLedger(ledgerId); + } - @Override - public boolean accept(long ledgerId) { - return meta.containsLedger(ledgerId); + @Override + public void process(final long ledgerId, long offset, ByteBuffer entry) + throws IOException { + rateLimiter.acquire(); + synchronized (CompactionScannerFactory.this) { + if (offsets.size() > maxOutstandingRequests) { + waitEntrylogFlushed(); + } + entry.getLong(); // discard ledger id, we already have it + long entryId = entry.getLong(); + entry.rewind(); + + long newoffset = entryLogger.addEntry(ledgerId, entry); + offsets.add(new Offset(ledgerId, entryId, newoffset)); + } + } + }; } + Object flushLock = new Object(); + @Override - public void process(final long ledgerId, long offset, ByteBuffer entry) - throws IOException { - if (!allSuccessful.get()) { - return; + public void onRotateEntryLog() { + synchronized (flushLock) { + flushLock.notifyAll(); } + } - outstandingRequests.incrementAndGet(); - synchronized (completionLock) { - while (outstandingRequests.get() >= COMPACTION_MAX_OUTSTANDING_REQUESTS) { - try { - completionLock.wait(); - } catch (InterruptedException ie) { - LOG.error("Interrupted while waiting to re-add entry", ie); - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while waiting to re-add entry", ie); + synchronized private void waitEntrylogFlushed() throws IOException { + try { + synchronized (flushLock) { + Offset lastOffset = offsets.get(offsets.size()-1); + long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.offset); + while (lastOffsetLogId < entryLogger.getLeastUnflushedLogId() && running) { + flushLock.wait(1000); + + lastOffset = offsets.get(offsets.size()-1); + lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.offset); + } + if (lastOffsetLogId >= entryLogger.getLeastUnflushedLogId() && !running) { + throw new IOException("Shutdown before flushed"); } } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted waiting for flush", ie); } - safeEntryAdder.safeAddEntry(ledgerId, entry, new GenericCallback<Void>() { - @Override - public void operationComplete(int rc, Void result) { - if (rc != BookieException.Code.OK) { - LOG.error("Error {} re-adding entry for ledger {})", - rc, ledgerId); - allSuccessful.set(false); - } - synchronized(completionLock) { - outstandingRequests.decrementAndGet(); - completionLock.notifyAll(); - } - } - }); - } - void awaitComplete() throws InterruptedException, IOException { - synchronized(completionLock) { - while (outstandingRequests.get() > 0) { - completionLock.wait(); - } - if (allSuccessful.get() == false) { - throw new IOException("Couldn't re-add all entries"); - } + for (Offset o : offsets) { + ledgerCache.putEntryOffset(o.ledger, o.entry, o.offset); } + offsets.clear(); + } + + synchronized void flush() throws IOException { + waitEntrylogFlushed(); + + ledgerCache.flushLedger(true); } } @@ -175,7 +190,6 @@ public class GarbageCollectorThread exte final LedgerCache ledgerCache, EntryLogger entryLogger, SnapshotMap<Long, Boolean> activeLedgers, - SafeEntryAdder safeEntryAdder, LedgerManager ledgerManager) throws IOException { super("GarbageCollectorThread"); @@ -183,9 +197,12 @@ public class GarbageCollectorThread exte this.ledgerCache = ledgerCache; this.entryLogger = entryLogger; this.activeLedgers = activeLedgers; - this.safeEntryAdder = safeEntryAdder; this.gcWaitTime = conf.getGcWaitTime(); + this.maxOutstandingRequests = conf.getCompactionMaxOutstandingRequests(); + this.compactionRate = conf.getCompactionRate(); + this.scannerFactory = new CompactionScannerFactory(); + entryLogger.addListener(this.scannerFactory); this.garbageCleaner = new GarbageCollector.GarbageCleaner() { @Override @@ -354,16 +371,42 @@ public class GarbageCollectorThread exte List<EntryLogMetadata> logsToCompact = new ArrayList<EntryLogMetadata>(); logsToCompact.addAll(entryLogMetaMap.values()); Collections.sort(logsToCompact, sizeComparator); + List<Long> toRemove = new ArrayList<Long>(); + for (EntryLogMetadata meta : logsToCompact) { if (meta.getUsage() >= threshold) { break; } LOG.debug("Compacting entry log {} below threshold {}.", meta.entryLogId, threshold); - compactEntryLog(meta.entryLogId); + try { + compactEntryLog(scannerFactory, meta); + toRemove.add(meta.entryLogId); + } catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) { + LOG.warn("No writable ledger directory available, aborting compaction", nwlde); + break; + } catch (IOException ioe) { + // if compact entry log throws IOException, we don't want to remove that + // entry log. however, if some entries from that log have been readded + // to the entry log, and the offset updated, it's ok to flush that + LOG.error("Error compacting entry log. Log won't be deleted", ioe); + } + if (!running) { // if gc thread is not running, stop compaction return; } } + try { + // compaction finished, flush any outstanding offsets + scannerFactory.flush(); + } catch (IOException ioe) { + LOG.error("Cannot flush compacted entries, skip removal", ioe); + return; + } + + // offsets have been flushed, its now safe to remove the old entrylogs + for (Long l : toRemove) { + removeEntryLog(l); + } } /** @@ -401,13 +444,8 @@ public class GarbageCollectorThread exte * @param entryLogId * Entry Log File Id */ - protected void compactEntryLog(long entryLogId) { - EntryLogMetadata entryLogMeta = entryLogMetaMap.get(entryLogId); - if (null == entryLogMeta) { - LOG.warn("Can't get entry log meta when compacting entry log " + entryLogId + "."); - return; - } - + protected void compactEntryLog(CompactionScannerFactory scannerFactory, + EntryLogMetadata entryLogMeta) throws IOException { // Similar with Sync Thread // try to mark compacting flag to make sure it would not be interrupted // by shutdown during compaction. otherwise it will receive @@ -419,19 +457,11 @@ public class GarbageCollectorThread exte return; } - LOG.info("Compacting entry log : " + entryLogId); + LOG.info("Compacting entry log : {}", entryLogMeta.entryLogId); try { - CompactionScanner scanner = new CompactionScanner(entryLogMeta); - entryLogger.scanEntryLog(entryLogId, scanner); - scanner.awaitComplete(); - // after moving entries to new entry log, remove this old one - removeEntryLog(entryLogId); - } catch (IOException e) { - LOG.info("Premature exception when compacting " + entryLogId, e); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - LOG.warn("Interrupted while compacting", ie); + entryLogger.scanEntryLog(entryLogMeta.entryLogId, + scannerFactory.newScanner(entryLogMeta)); } finally { // clear compacting flag compacting.set(false); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1534503&r1=1534502&r2=1534503&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Tue Oct 22 05:44:13 2013 @@ -80,14 +80,14 @@ class InterleavedLedgerStorage implement private volatile boolean somethingWritten = false; InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, - LedgerDirsManager ledgerDirsManager, CheckpointSource checkpointSource, - GarbageCollectorThread.SafeEntryAdder safeEntryAdder) throws IOException { + LedgerDirsManager ledgerDirsManager, CheckpointSource checkpointSource) + throws IOException { activeLedgers = new SnapshotMap<Long, Boolean>(); this.checkpointSource = checkpointSource; entryLogger = new EntryLogger(conf, ledgerDirsManager, this); ledgerCache = new LedgerCacheImpl(conf, activeLedgers, ledgerDirsManager); gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger, - activeLedgers, safeEntryAdder, ledgerManager); + activeLedgers, ledgerManager); } @Override @@ -207,6 +207,7 @@ class InterleavedLedgerStorage implement // current entry logger file isn't flushed yet. flushOrCheckpoint(true); // after the ledger storage finished checkpointing, try to clear the done checkpoint + checkpointHolder.clearLastCheckpoint(lastCheckpoint); return lastCheckpoint; } Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1534503&r1=1534502&r2=1534503&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Tue Oct 22 05:44:13 2013 @@ -36,6 +36,9 @@ public class ServerConfiguration extends protected final static String MINOR_COMPACTION_THRESHOLD = "minorCompactionThreshold"; protected final static String MAJOR_COMPACTION_INTERVAL = "majorCompactionInterval"; protected final static String MAJOR_COMPACTION_THRESHOLD = "majorCompactionThreshold"; + protected final static String COMPACTION_MAX_OUTSTANDING_REQUESTS + = "compactionMaxOutstandingRequests"; + protected final static String COMPACTION_RATE = "compactionRate"; // Gc Parameters protected final static String GC_WAIT_TIME = "gcWaitTime"; @@ -789,16 +792,6 @@ public class ServerConfiguration extends } /** - * Should we remove pages from page cache after force write - * - * @return remove pages from cache - */ - @Beta - public boolean getJournalRemovePagesFromCache() { - return getBoolean(JOURNAL_REMOVE_FROM_PAGE_CACHE, false); - } - - /** * Set whether the bookie is able to go into read-only mode. * If this is set to false, the bookie will shutdown on encountering * an error condition. @@ -908,4 +901,77 @@ public class ServerConfiguration extends return getBoolean(AUTO_RECOVERY_DAEMON_ENABLED, false); } + /** + * Get the maximum number of entries which can be compacted without flushing. + * Default is 100,000. + * + * @return the maximum number of unflushed entries + */ + public int getCompactionMaxOutstandingRequests() { + return getInt(COMPACTION_MAX_OUTSTANDING_REQUESTS, 100000); + } + + /** + * Set the maximum number of entries which can be compacted without flushing. + * + * When compacting, the entries are written to the entrylog and the new offsets + * are cached in memory. Once the entrylog is flushed the index is updated with + * the new offsets. This parameter controls the number of entries added to the + * entrylog before a flush is forced. A higher value for this parameter means + * more memory will be used for offsets. Each offset consists of 3 longs. + * + * This parameter should _not_ be modified unless you know what you're doing. + * The default is 100,000. + * + * @param maxOutstandingRequests number of entries to compact before flushing + * + * @return ServerConfiguration + */ + public ServerConfiguration setCompactionMaxOutstandingRequests(int maxOutstandingRequests) { + setProperty(COMPACTION_MAX_OUTSTANDING_REQUESTS, maxOutstandingRequests); + return this; + } + + /** + * Get the rate of compaction adds. Default is 1,000. + * + * @return rate of compaction (adds per second) + */ + public int getCompactionRate() { + return getInt(COMPACTION_RATE, 1000); + } + + /** + * Set the rate of compaction adds. + * + * @param rate rate of compaction adds (adds per second) + * + * @return ServerConfiguration + */ + public ServerConfiguration setCompactionRate(int rate) { + setProperty(COMPACTION_RATE, rate); + return this; + } + + /** + * Should we remove pages from page cache after force write + * + * @return remove pages from cache + */ + @Beta + public boolean getJournalRemovePagesFromCache() { + return getBoolean(JOURNAL_REMOVE_FROM_PAGE_CACHE, false); + } + + /** + * Sets that whether should we remove pages from page cache after force write. + * + * @param enabled + * - true if we need to remove pages from page cache. otherwise, false + * @return ServerConfiguration + */ + public ServerConfiguration setJournalRemovePagesFromCache(boolean enabled) { + setProperty(JOURNAL_REMOVE_FROM_PAGE_CACHE, enabled); + return this; + } } Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1534503&r1=1534502&r2=1534503&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Tue Oct 22 05:44:13 2013 @@ -21,15 +21,30 @@ package org.apache.bookkeeper.bookie; * */ import java.io.File; -import java.util.Arrays; +import java.io.IOException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentHashMap; +import java.util.Collections; import java.util.Enumeration; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.util.TestUtils; +import org.apache.zookeeper.AsyncCallback; +import org.apache.bookkeeper.client.LedgerMetadata; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; +import org.apache.bookkeeper.versioning.Version; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -294,4 +309,157 @@ public class CompactionTest extends Book // since those entries has been compacted to new entry log verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed()); } + + /** + * Test that compaction doesnt add to index without having persisted + * entrylog first. This is needed because compaction doesn't go through the journal. + * {@see https://issues.apache.org/jira/browse/BOOKKEEPER-530} + * {@see https://issues.apache.org/jira/browse/BOOKKEEPER-664} + */ + @Test(timeout=60000) + public void testCompactionSafety() throws Exception { + tearDown(); // I dont want the test infrastructure + ServerConfiguration conf = new ServerConfiguration(); + final Set<Long> ledgers = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>()); + LedgerManager manager = new LedgerManager() { + @Override + public void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb) { + unsupported(); + } + @Override + public void removeLedgerMetadata(long ledgerId, Version version, + GenericCallback<Void> vb) { + unsupported(); + } + @Override + public void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> readCb) { + unsupported(); + } + @Override + public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, + GenericCallback<Void> cb) { + unsupported(); + } + @Override + public void asyncProcessLedgers(Processor<Long> processor, + AsyncCallback.VoidCallback finalCb, + Object context, int successRc, int failureRc) { + unsupported(); + } + @Override + public void close() throws IOException {} + + void unsupported() { + LOG.error("Unsupported operation called", new Exception()); + throw new RuntimeException("Unsupported op"); + } + @Override + public LedgerRangeIterator getLedgerRanges() { + final AtomicBoolean hasnext = new AtomicBoolean(true); + return new LedgerManager.LedgerRangeIterator() { + @Override + public boolean hasNext() throws IOException { + return hasnext.get(); + } + @Override + public LedgerManager.LedgerRange next() throws IOException { + hasnext.set(false); + return new LedgerManager.LedgerRange(ledgers); + } + }; + } + }; + + File tmpDir = File.createTempFile("bkTest", ".dir"); + tmpDir.delete(); + tmpDir.mkdir(); + File curDir = Bookie.getCurrentDirectory(tmpDir); + Bookie.checkDirectoryStructure(curDir); + conf.setLedgerDirNames(new String[] {tmpDir.toString()}); + + conf.setEntryLogSizeLimit(EntryLogger.LOGFILE_HEADER_SIZE + 3 * (4+ENTRY_SIZE)); + conf.setGcWaitTime(100); + conf.setMinorCompactionThreshold(0.7f); + conf.setMajorCompactionThreshold(0.0f); + conf.setMinorCompactionInterval(1); + conf.setMajorCompactionInterval(10); + conf.setPageLimit(1); + + CheckpointSource checkpointSource = new CheckpointSource() { + AtomicInteger idGen = new AtomicInteger(0); + class MyCheckpoint implements CheckpointSource.Checkpoint { + int id = idGen.incrementAndGet(); + @Override + public int compareTo(CheckpointSource.Checkpoint o) { + if (o == CheckpointSource.Checkpoint.MAX) { + return -1; + } else if (o == CheckpointSource.Checkpoint.MIN) { + return 1; + } + return id - ((MyCheckpoint)o).id; + } + } + + @Override + public CheckpointSource.Checkpoint newCheckpoint() { + return new MyCheckpoint(); + } + + public void checkpointComplete(CheckpointSource.Checkpoint checkpoint, boolean compact) + throws IOException { + } + }; + final byte[] KEY = "foobar".getBytes(); + File log0 = new File(curDir, "0.log"); + LedgerDirsManager dirs = new LedgerDirsManager(conf); + assertFalse("Log shouldnt exist", log0.exists()); + InterleavedLedgerStorage storage = new InterleavedLedgerStorage(conf, manager, + dirs, checkpointSource); + ledgers.add(1l); + ledgers.add(2l); + ledgers.add(3l); + storage.setMasterKey(1, KEY); + storage.setMasterKey(2, KEY); + storage.setMasterKey(3, KEY); + storage.addEntry(genEntry(1, 1, ENTRY_SIZE)); + storage.addEntry(genEntry(2, 1, ENTRY_SIZE)); + storage.addEntry(genEntry(2, 2, ENTRY_SIZE)); + storage.addEntry(genEntry(3, 2, ENTRY_SIZE)); + storage.flush(); + storage.shutdown(); + + assertTrue("Log should exist", log0.exists()); + ledgers.remove(2l); + ledgers.remove(3l); + + storage = new InterleavedLedgerStorage(conf, manager, dirs, checkpointSource); + storage.start(); + for (int i = 0; i < 10; i++) { + if (!log0.exists()) { + break; + } + Thread.sleep(1000); + storage.entryLogger.flush(); // simulate sync thread + } + assertFalse("Log shouldnt exist", log0.exists()); + + ledgers.add(4l); + storage.setMasterKey(4, KEY); + storage.addEntry(genEntry(4, 1, ENTRY_SIZE)); // force ledger 1 page to flush + + storage = new InterleavedLedgerStorage(conf, manager, dirs, checkpointSource); + storage.getEntry(1, 1); // entry should exist + } + + private ByteBuffer genEntry(long ledger, long entry, int size) { + byte[] data = new byte[size]; + ByteBuffer bb = ByteBuffer.wrap(new byte[size]); + bb.putLong(ledger); + bb.putLong(entry); + while (bb.hasRemaining()) { + bb.put((byte)0xFF); + } + bb.flip(); + return bb; + } }