Repository: bookkeeper Updated Branches: refs/heads/master 2c567d008 -> 96adbf1d6
BOOKKEEPER-851: Configurable LedgerStorage implementation sijie Addressed almost all comments from https://reviews.apache.org/r/33096 Only point still open is how to treat the `SortedLedgerStorage` and the flag `sortedLedgerStorageEnabled=true` in the config file Author: Matteo Merli <[email protected]> Reviewers: Sijie Guo <[email protected]> Closes #6 from merlimat/bk-851-configurable-ledger-storage Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/96adbf1d Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/96adbf1d Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/96adbf1d Branch: refs/heads/master Commit: 96adbf1d613a63602c8b1b4aad1b0a7d17e6eee3 Parents: 2c567d0 Author: Matteo Merli <[email protected]> Authored: Mon Mar 7 21:49:52 2016 -0800 Committer: Sijie Guo <[email protected]> Committed: Mon Mar 7 21:49:52 2016 -0800 ---------------------------------------------------------------------- bookkeeper-server/conf/bk_server.conf | 3 + .../org/apache/bookkeeper/bookie/Bookie.java | 19 ++- .../bookie/CompactableLedgerStorage.java | 61 +++++++++ .../apache/bookkeeper/bookie/EntryLocation.java | 34 +++++ .../org/apache/bookkeeper/bookie/FileInfo.java | 1 + .../bookie/GarbageCollectorThread.java | 77 +++++------ .../bookkeeper/bookie/IndexPersistenceMgr.java | 9 +- .../bookie/InterleavedLedgerStorage.java | 72 ++++++++--- .../apache/bookkeeper/bookie/LedgerStorage.java | 26 +++- .../bookkeeper/bookie/LedgerStorageFactory.java | 35 +++++ .../bookkeeper/bookie/ReadOnlyEntryLogger.java | 2 +- .../bookie/ScanAndCompareGarbageCollector.java | 47 ++++--- .../bookkeeper/bookie/SortedLedgerStorage.java | 21 +-- .../bookkeeper/conf/ServerConfiguration.java | 48 ++++++- .../bookkeeper/bookie/CompactionTest.java | 39 +++--- .../bookkeeper/bookie/LedgerCacheTest.java | 2 +- .../bookkeeper/bookie/TestSyncThread.java | 14 ++ .../apache/bookkeeper/meta/GcLedgersTest.java | 127 ++++++++++++++++++- .../test/ForceReadOnlyBookieTest.java | 7 +- .../bookkeeper/test/LedgerDeleteTest.java | 4 +- .../bookkeeper/test/ReadOnlyBookieTest.java | 4 +- 21 files changed, 502 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/conf/bk_server.conf ---------------------------------------------------------------------- diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf index 53db095..f73e633 100644 --- a/bookkeeper-server/conf/bk_server.conf +++ b/bookkeeper-server/conf/bk_server.conf @@ -67,6 +67,9 @@ ledgerDirectories=/tmp/bk-data # store all ledgers. # zkLedgersRootPath=/ledgers +# Ledger storage implementation class +# ledgerStorageClass=org.apache.bookkeeper.bookie.SortedLedgerStorage + # Enable/Disable entry logger preallocation # entryLogFilePreallocationEnabled=true http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 74876ff..6272200 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -503,16 +503,11 @@ public class Bookie extends BookieCriticalThread { // instantiate the journal journal = new Journal(conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE)); - // Check the type of storage. - if (conf.getSortedLedgerStorageEnabled()) { - ledgerStorage = new SortedLedgerStorage(conf, ledgerManager, - ledgerDirsManager, indexDirsManager, - journal, statsLogger); - } else { - ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager, - ledgerDirsManager, indexDirsManager, - journal, statsLogger); - } + // Instantiate the ledger storage implementation + String ledgerStorageClass = conf.getLedgerStorageClass(); + LOG.info("Using ledger storage: {}", ledgerStorageClass); + ledgerStorage = LedgerStorageFactory.createLedgerStorage(ledgerStorageClass); + ledgerStorage.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager, journal, statsLogger); syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, journal); @@ -712,7 +707,9 @@ public class Bookie extends BookieCriticalThread { try { jmxLedgerStorageBean = this.ledgerStorage.getJMXBean(); - BKMBeanRegistry.getInstance().register(jmxLedgerStorageBean, jmxBookieBean); + if (jmxLedgerStorageBean != null) { + BKMBeanRegistry.getInstance().register(jmxLedgerStorageBean, jmxBookieBean); + } } catch (Exception e) { LOG.warn("Failed to register with JMX for ledger cache", e); jmxLedgerStorageBean = null; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java new file mode 100644 index 0000000..c76a485 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import java.io.IOException; + +/** + * Interface that identifies LedgerStorage implementations using EntryLogger and running periodic entries compaction + */ +public interface CompactableLedgerStorage extends LedgerStorage { + + /** + * @return the EntryLogger used by the ledger storage + */ + EntryLogger getEntryLogger(); + + /** + * Get an iterator over a range of ledger ids stored in the bookie. + * + * @param firstLedgerId first ledger id in the sequence (included) + * @param lastLedgerId last ledger id in the sequence (not included) + * @return + */ + Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) + throws IOException; + + /** + * Update the location of several entries + * + * @param locations + * the list of locations to update + * @throws IOException + */ + void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException; + + /** + * Flush the entries locations index for the compacted entries + * + * @throws IOException + */ + void flushEntriesLocationsIndex() throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java new file mode 100644 index 0000000..dbb85a1 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +public class EntryLocation { + public final long ledger; + public final long entry; + public final long location; + + public EntryLocation(long ledger, long entry, long location) { + this.ledger = ledger; + this.entry = entry; + this.location = location; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java index 44f004a..3b7c8dc 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java @@ -265,6 +265,7 @@ class FileInfo { } if (useCount.get() == 0 && fc != null) { fc.close(); + fc = null; } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 1c9c7e7..4d7da25 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -39,7 +39,6 @@ import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.util.MathUtils; -import org.apache.bookkeeper.util.SnapshotMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,9 +79,7 @@ public class GarbageCollectorThread extends BookieThread { // Entry Logger Handle final EntryLogger entryLogger; - // Ledger Cache Handle - final LedgerCache ledgerCache; - final SnapshotMap<Long, Boolean> activeLedgers; + final CompactableLedgerStorage ledgerStorage; // flag to ensure gc thread will not be interrupted during compaction // to reduce the risk getting entry log corrupted @@ -103,35 +100,23 @@ public class GarbageCollectorThread extends BookieThread { final GarbageCollector garbageCollector; final GarbageCleaner garbageCleaner; - private static class Offset { - final long ledger; - final long entry; - final long offset; - - Offset(long ledger, long entry, long offset) { - this.ledger = ledger; - this.entry = entry; - this.offset = offset; - } - } - private static class Throttler { final RateLimiter rateLimiter; final boolean isThrottleByBytes; final int compactionRateByBytes; final int compactionRateByEntries; - Throttler(boolean isThrottleByBytes, - int compactionRateByBytes, + Throttler(boolean isThrottleByBytes, + int compactionRateByBytes, int compactionRateByEntries) { this.isThrottleByBytes = isThrottleByBytes; this.compactionRateByBytes = compactionRateByBytes; this.compactionRateByEntries = compactionRateByEntries; - this.rateLimiter = RateLimiter.create(this.isThrottleByBytes ? - this.compactionRateByBytes : + this.rateLimiter = RateLimiter.create(this.isThrottleByBytes ? + this.compactionRateByBytes : this.compactionRateByEntries); } - + // acquire. if bybytes: bytes of this entry; if byentries: 1. void acquire(int permits) { rateLimiter.acquire(this.isThrottleByBytes ? permits : 1); @@ -142,11 +127,11 @@ public class GarbageCollectorThread extends BookieThread { * A scanner wrapper to check whether a ledger is alive in an entry log file */ class CompactionScannerFactory implements EntryLogger.EntryLogListener { - List<Offset> offsets = new ArrayList<Offset>(); + List<EntryLocation> offsets = new ArrayList<EntryLocation>(); EntryLogScanner newScanner(final EntryLogMetadata meta) { final Throttler throttler = new Throttler (isThrottleByBytes, - compactionRateByBytes, + compactionRateByBytes, compactionRateByEntries); return new EntryLogScanner() { @@ -168,7 +153,7 @@ public class GarbageCollectorThread extends BookieThread { entry.rewind(); long newoffset = entryLogger.addEntry(ledgerId, entry); - offsets.add(new Offset(ledgerId, entryId, newoffset)); + offsets.add(new EntryLocation(ledgerId, entryId, newoffset)); } } }; @@ -190,15 +175,15 @@ public class GarbageCollectorThread extends BookieThread { return; } - Offset lastOffset = offsets.get(offsets.size()-1); - long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.offset); + EntryLocation lastOffset = offsets.get(offsets.size()-1); + long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location); while (lastOffsetLogId < entryLogger.getLeastUnflushedLogId() && running) { synchronized (flushLock) { flushLock.wait(1000); } lastOffset = offsets.get(offsets.size()-1); - lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.offset); + lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location); } if (lastOffsetLogId >= entryLogger.getLeastUnflushedLogId() && !running) { throw new IOException("Shutdown before flushed"); @@ -208,16 +193,14 @@ public class GarbageCollectorThread extends BookieThread { throw new IOException("Interrupted waiting for flush", ie); } - for (Offset o : offsets) { - ledgerCache.putEntryOffset(o.ledger, o.entry, o.offset); - } + ledgerStorage.updateEntriesLocations(offsets); offsets.clear(); } synchronized void flush() throws IOException { waitEntrylogFlushed(); - ledgerCache.flushLedger(true); + ledgerStorage.flushEntriesLocationsIndex(); } } @@ -230,16 +213,13 @@ public class GarbageCollectorThread extends BookieThread { * @throws IOException */ public GarbageCollectorThread(ServerConfiguration conf, - final LedgerCache ledgerCache, - EntryLogger entryLogger, - SnapshotMap<Long, Boolean> activeLedgers, - LedgerManager ledgerManager) + LedgerManager ledgerManager, + final CompactableLedgerStorage ledgerStorage) throws IOException { super("GarbageCollectorThread"); - this.ledgerCache = ledgerCache; - this.entryLogger = entryLogger; - this.activeLedgers = activeLedgers; + this.entryLogger = ledgerStorage.getEntryLogger(); + this.ledgerStorage = ledgerStorage; this.gcWaitTime = conf.getGcWaitTime(); this.isThrottleByBytes = conf.getIsThrottleByBytes(); @@ -256,14 +236,15 @@ public class GarbageCollectorThread extends BookieThread { if (LOG.isDebugEnabled()) { LOG.debug("delete ledger : " + ledgerId); } - ledgerCache.deleteLedger(ledgerId); + + ledgerStorage.deleteLedger(ledgerId); } catch (IOException e) { LOG.error("Exception when deleting the ledger index file on the Bookie: ", e); } } }; - this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, activeLedgers); + this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage); // compaction parameters minorCompactionThreshold = conf.getMinorCompactionThreshold(); @@ -333,7 +314,7 @@ public class GarbageCollectorThread extends BookieThread { LOG.info("Suspend Major Compaction triggered by thread: {}", Thread.currentThread().getName()); } } - + public void resumeMajorGC() { if (suspendMajorCompaction.compareAndSet(true, false)) { LOG.info("{} Major Compaction back to normal since bookie has enough space now.", Thread.currentThread().getName()); @@ -345,7 +326,7 @@ public class GarbageCollectorThread extends BookieThread { LOG.info("Suspend Minor Compaction triggered by thread: {}", Thread.currentThread().getName()); } } - + public void resumeMinorGC() { if (suspendMinorCompaction.compareAndSet(true, false)) { LOG.info("{} Minor Compaction back to normal since bookie has enough space now.", Thread.currentThread().getName()); @@ -389,7 +370,7 @@ public class GarbageCollectorThread extends BookieThread { } long curTime = MathUtils.now(); - if (enableMajorCompaction && (!suspendMajor) && + if (enableMajorCompaction && (!suspendMajor) && (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) { // enter major compaction LOG.info("Enter major compaction, suspendMajor {}", suspendMajor); @@ -400,7 +381,7 @@ public class GarbageCollectorThread extends BookieThread { continue; } - if (enableMinorCompaction && (!suspendMinor) && + if (enableMinorCompaction && (!suspendMinor) && (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) { // enter minor compaction LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor); @@ -428,8 +409,12 @@ public class GarbageCollectorThread extends BookieThread { EntryLogMetadata meta = entryLogMetaMap.get(entryLogId); for (Long entryLogLedger : meta.getLedgersMap().keySet()) { // Remove the entry log ledger from the set if it isn't active. - if (!activeLedgers.containsKey(entryLogLedger)) { - meta.removeLedger(entryLogLedger); + try { + if (!ledgerStorage.ledgerExists(entryLogLedger)) { + meta.removeLedger(entryLogLedger); + } + } catch (IOException e) { + LOG.error("Error reading from ledger storage", e); } } if (meta.isEmpty()) { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java index 60b4099..573ad45 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java @@ -267,14 +267,7 @@ public class IndexPersistenceMgr { } boolean ledgerExists(long ledgerId) throws IOException { - FileInfo fi = fileInfoCache.get(ledgerId); - if (fi == null) { - File lf = findIndexFile(ledgerId); - if (lf == null) { - return false; - } - } - return true; + return activeLedgers.containsKey(ledgerId); } int getNumOpenLedgers() { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index eb27757..e85c8db 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -25,17 +25,20 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.bookkeeper.bookie.Bookie.NoLedgerException; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import java.util.concurrent.TimeUnit; +import java.util.Map; +import java.util.NavigableMap; + import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.jmx.BKMBeanInfo; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.proto.BookieProtocol; -import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.MathUtils; @@ -51,7 +54,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFF * This ledger storage implementation stores all entries in a single * file and maintains an index file for each ledger. */ -class InterleavedLedgerStorage implements LedgerStorage, EntryLogListener { +public class InterleavedLedgerStorage implements CompactableLedgerStorage, EntryLogListener { private final static Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class); // Hold the last checkpoint @@ -77,7 +80,7 @@ class InterleavedLedgerStorage implements LedgerStorage, EntryLogListener { EntryLogger entryLogger; LedgerCache ledgerCache; - private final CheckpointSource checkpointSource; + private CheckpointSource checkpointSource; protected final CheckpointHolder checkpointHolder = new CheckpointHolder(); // A sorted map to stored all active ledger ids @@ -86,32 +89,30 @@ class InterleavedLedgerStorage implements LedgerStorage, EntryLogListener { // This is the thread that garbage collects the entry logs that do not // contain any active ledgers in them; and compacts the entry logs that // has lower remaining percentage to reclaim disk space. - final GarbageCollectorThread gcThread; + GarbageCollectorThread gcThread; // this indicates that a write has happened since the last flush private volatile boolean somethingWritten = false; // Expose Stats - private final OpStatsLogger getOffsetStats; - private final OpStatsLogger getEntryStats; + private OpStatsLogger getOffsetStats; + private OpStatsLogger getEntryStats; - InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, - LedgerDirsManager ledgerDirsManager, CheckpointSource checkpointSource) - throws IOException { - this(conf, ledgerManager, ledgerDirsManager, ledgerDirsManager, checkpointSource, NullStatsLogger.INSTANCE); + InterleavedLedgerStorage() { + activeLedgers = new SnapshotMap<Long, Boolean>(); } - InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, - LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, - CheckpointSource checkpointSource, StatsLogger statsLogger) + @Override + public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, + LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, + CheckpointSource checkpointSource, StatsLogger statsLogger) throws IOException { - activeLedgers = new SnapshotMap<Long, Boolean>(); + this.checkpointSource = checkpointSource; entryLogger = new EntryLogger(conf, ledgerDirsManager, this); ledgerCache = new LedgerCacheImpl(conf, activeLedgers, null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger); - gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger, - activeLedgers, ledgerManager); + gcThread = new GarbageCollectorThread(conf, ledgerManager, this); ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener()); // Expose Stats getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET); @@ -342,6 +343,45 @@ class InterleavedLedgerStorage implements LedgerStorage, EntryLogListener { } @Override + public void deleteLedger(long ledgerId) throws IOException { + activeLedgers.remove(ledgerId); + ledgerCache.deleteLedger(ledgerId); + } + + @Override + public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) { + NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot(); + Map<Long, Boolean> subBkActiveLedgers = bkActiveLedgersSnapshot + .subMap(firstLedgerId, true, lastLedgerId, false); + + return subBkActiveLedgers.keySet(); + } + + @Override + public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException { + for (EntryLocation l : locations) { + try { + ledgerCache.putEntryOffset(l.ledger, l.entry, l.location); + } catch (NoLedgerException e) { + // Ledger was already deleted, we can skip it in the compaction + if (LOG.isDebugEnabled()) { + LOG.debug("Compaction failed for deleted ledger ledger: {} entry: {}", l.ledger, l.entry); + } + } + } + } + + @Override + public void flushEntriesLocationsIndex() throws IOException { + ledgerCache.flushLedger(true); + } + + @Override + public EntryLogger getEntryLogger() { + return entryLogger; + } + + @Override public BKMBeanInfo getJMXBean() { return ledgerCache.getJMXBean(); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java index e992d03..f2f00c4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java @@ -25,13 +25,30 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.stats.StatsLogger; + import org.apache.bookkeeper.jmx.BKMBeanInfo; +import org.apache.bookkeeper.meta.LedgerManager; /** * Interface for storing ledger data * on persistant storage. */ -interface LedgerStorage { +public interface LedgerStorage { + + /** + * Initialize the LedgerStorage implementation + * + * @param conf + * @param ledgerManager + * @param ledgerDirsManager + */ + public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, + LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, + CheckpointSource checkpointSource, StatsLogger statsLogger) + throws IOException; + /** * Start any background threads * belonging to the storage system. For example, @@ -111,6 +128,13 @@ interface LedgerStorage { */ Checkpoint checkpoint(Checkpoint checkpoint) throws IOException; + /* + * + * @param ledgerId + * @throws IOException + */ + void deleteLedger(long ledgerId) throws IOException; + /** * Get the JMX management bean for this LedgerStorage */ http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorageFactory.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorageFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorageFactory.java new file mode 100644 index 0000000..d2bac64 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorageFactory.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie; + +import java.io.IOException; + +import org.apache.bookkeeper.util.ReflectionUtils; + +public class LedgerStorageFactory { + public static LedgerStorage createLedgerStorage(String name) throws IOException { + try { + return ReflectionUtils.newInstance(name, LedgerStorage.class); + } catch (Throwable t) { + throw new IOException("Failed to instantiate ledger storage : " + name, t); + } + } +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java index a2ce9e3..c0af6c3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java @@ -52,7 +52,7 @@ public class ReadOnlyEntryLogger extends EntryLogger { } @Override - synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException { + public synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException { throw new IOException("Can't add entry to a readonly entry logger."); } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java index d8a87e4..2b4e3c0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java @@ -21,17 +21,17 @@ package org.apache.bookkeeper.bookie; -import java.util.Map; -import java.util.NavigableMap; +import java.util.NavigableSet; import java.util.Set; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManager.LedgerRange; import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; -import org.apache.bookkeeper.util.SnapshotMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Sets; + /** * Garbage collector implementation using scan and compare. * @@ -49,50 +49,49 @@ import org.slf4j.LoggerFactory; public class ScanAndCompareGarbageCollector implements GarbageCollector{ static final Logger LOG = LoggerFactory.getLogger(ScanAndCompareGarbageCollector.class); - private SnapshotMap<Long, Boolean> activeLedgers; - private LedgerManager ledgerManager; + private final LedgerManager ledgerManager; + private final CompactableLedgerStorage ledgerStorage; - public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, SnapshotMap<Long, Boolean> activeLedgers) { + public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, CompactableLedgerStorage ledgerStorage) { this.ledgerManager = ledgerManager; - this.activeLedgers = activeLedgers; + this.ledgerStorage = ledgerStorage; } @Override public void gc(GarbageCleaner garbageCleaner) { - // create a snapshot first - NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = - this.activeLedgers.snapshot(); - LedgerRangeIterator ledgerRangeIterator = ledgerManager.getLedgerRanges(); try { - // Empty global active ledgers, need to remove all local active ledgers. + // Get a set of all ledgers on the bookie + NavigableSet<Long> bkActiveLedgers = Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE)); + + // Iterate over all the ledger on the metadata store + LedgerRangeIterator ledgerRangeIterator = ledgerManager.getLedgerRanges(); + if (!ledgerRangeIterator.hasNext()) { - for (Long bkLid : bkActiveLedgersSnapshot.keySet()) { - // remove it from current active ledger - bkActiveLedgersSnapshot.remove(bkLid); - garbageCleaner.clean(bkLid); + // Empty global active ledgers, need to remove all local active ledgers. + for (long ledgerId : bkActiveLedgers) { + garbageCleaner.clean(ledgerId); } } + long lastEnd = -1; while(ledgerRangeIterator.hasNext()) { LedgerRange lRange = ledgerRangeIterator.next(); - Map<Long, Boolean> subBkActiveLedgers = null; Long start = lastEnd + 1; Long end = lRange.end(); if (!ledgerRangeIterator.hasNext()) { end = Long.MAX_VALUE; } - subBkActiveLedgers = bkActiveLedgersSnapshot.subMap( - start, true, end, true); + + Iterable<Long> subBkActiveLedgers = bkActiveLedgers.subSet(start, true, end, true); Set<Long> ledgersInMetadata = lRange.getLedgers(); - LOG.debug("Active in metadata {}, Active in bookie {}", - ledgersInMetadata, subBkActiveLedgers.keySet()); - for (Long bkLid : subBkActiveLedgers.keySet()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Active in metadata {}, Active in bookie {}", ledgersInMetadata, subBkActiveLedgers); + } + for (Long bkLid : subBkActiveLedgers) { if (!ledgersInMetadata.contains(bkLid)) { - // remove it from current active ledger - subBkActiveLedgers.remove(bkLid); garbageCleaner.clean(bkLid); } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java index da14885..b67e01e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java @@ -39,14 +39,19 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage implements LedgerStorage, CacheCallback, SkipListFlusher { private final static Logger LOG = LoggerFactory.getLogger(SortedLedgerStorage.class); - private final EntryMemTable memTable; - private final ScheduledExecutorService scheduler; - - public SortedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, - LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, - final CheckpointSource checkpointSource, StatsLogger statsLogger) - throws IOException { - super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, checkpointSource, statsLogger); + private EntryMemTable memTable; + private ScheduledExecutorService scheduler; + + public SortedLedgerStorage() { + super(); + } + + @Override + public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, + LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, + final CheckpointSource checkpointSource, StatsLogger statsLogger) + throws IOException { + super.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager, null, statsLogger); this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger); this.scheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder() http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index d4305ca..76e5037 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -26,6 +26,8 @@ import org.apache.bookkeeper.stats.StatsProvider; import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.bookkeeper.util.ReflectionUtils; import org.apache.commons.configuration.ConfigurationException; +import org.apache.bookkeeper.bookie.InterleavedLedgerStorage; +import org.apache.bookkeeper.bookie.SortedLedgerStorage; import org.apache.commons.lang.StringUtils; /** @@ -118,6 +120,8 @@ public class ServerConfiguration extends AbstractConfiguration { protected final static String ENABLE_STATISTICS = "enableStatistics"; protected final static String STATS_PROVIDER_CLASS = "statsProviderClass"; + protected final static String LEDGER_STORAGE_CLASS = "ledgerStorageClass"; + /** * Construct a default configuration object */ @@ -833,9 +837,9 @@ public class ServerConfiguration extends AbstractConfiguration { * * Force GC may get some space back, but may also fill up disk space more * quickly. This is because new log files are created before GC, while old - * garbage log files deleted after GC. + * garbage log files deleted after GC. * - * @return true - do force GC when disk full, + * @return true - do force GC when disk full, * false - suspend GC when disk full. */ public boolean getIsForceGCAllowWhenNoSpace() { @@ -844,7 +848,7 @@ public class ServerConfiguration extends AbstractConfiguration { /** * Set whether force GC is allowed when disk full or almost full. - * + * * @param force true to allow force GC; false to suspend GC * * @return ServerConfiguration @@ -993,6 +997,7 @@ public class ServerConfiguration extends AbstractConfiguration { /** * Set sorted-ledger storage enabled or not * + * @deprecated Use {@link #setLedgerStorageClass(String)} to configure the implementation class * @param enabled */ public ServerConfiguration setSortedLedgerStorageEnabled(boolean enabled) { @@ -1320,7 +1325,7 @@ public class ServerConfiguration extends AbstractConfiguration { /** * Get whether use bytes to throttle garbage collector compaction or not * - * @return true - use Bytes, + * @return true - use Bytes, * false - use Entries. */ public boolean getIsThrottleByBytes() { @@ -1369,7 +1374,7 @@ public class ServerConfiguration extends AbstractConfiguration { setProperty(COMPACTION_MAX_OUTSTANDING_REQUESTS, maxOutstandingRequests); return this; } - + /** * Get the rate of compaction adds. Default is 1,000. * @@ -1457,6 +1462,39 @@ public class ServerConfiguration extends AbstractConfiguration { return this; } + /* + * Get the {@link LedgerStorage} implementation class name + * + * @return the class name + */ + public String getLedgerStorageClass() { + String ledgerStorageClass = getString(LEDGER_STORAGE_CLASS, SortedLedgerStorage.class.getName()); + if (ledgerStorageClass.equals(SortedLedgerStorage.class.getName()) + && getSortedLedgerStorageEnabled() == false) { + // This is to retain compatibility with BK-4.3 configuration + // In BK-4.3, the ledger storage is configured through the "sortedLedgerStorageEnabled" flag : + // sortedLedgerStorageEnabled == true (default) ---> use SortedLedgerStorage + // sortedLedgerStorageEnabled == false ---> use InterleavedLedgerStorage + // + // Since BK-4.4, one can specify the implementation class, but if it was using InterleavedLedgerStorage it + // should continue to use that with the same configuration + ledgerStorageClass = InterleavedLedgerStorage.class.getName(); + } + + return ledgerStorageClass; + } + + /** + * Set the {@link LedgerStorage} implementation class name + * + * @param ledgerStorageClass the class name + * @return ServerConfiguration + */ + public ServerConfiguration setLedgerStorageClass(String ledgerStorageClass) { + setProperty(LEDGER_STORAGE_CLASS, ledgerStorageClass); + return this; + } + /** * Get whether bookie is using hostname for registration and in ledger * metadata. Defaults to false. http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java index 16bd4da..4f3bb87 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java @@ -40,6 +40,7 @@ import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; @@ -68,8 +69,8 @@ public class CompactionTest extends BookKeeperClusterTestCase { return Arrays.asList(new Object[][] {{true}, {false}}); } - private boolean isThrottleByBytes; - + private boolean isThrottleByBytes; + private final static Logger LOG = LoggerFactory.getLogger(CompactionTest.class); DigestType digestType; @@ -118,7 +119,7 @@ public class CompactionTest extends BookKeeperClusterTestCase { baseConf.setMinorCompactionInterval(minorCompactionInterval); baseConf.setMajorCompactionInterval(majorCompactionInterval); baseConf.setEntryLogFilePreAllocationEnabled(false); - baseConf.setSortedLedgerStorageEnabled(false); + baseConf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); baseConf.setIsThrottleByBytes(this.isThrottleByBytes); super.setUp(); @@ -219,9 +220,10 @@ public class CompactionTest extends BookKeeperClusterTestCase { for (File dir : dirManager.getAllLedgerDirs()) { Bookie.checkDirectoryStructure(dir); } - InterleavedLedgerStorage storage = new InterleavedLedgerStorage(conf, - LedgerManagerFactory.newLedgerManagerFactory(conf, zkc).newLedgerManager(), - dirManager, cp); + InterleavedLedgerStorage storage = new InterleavedLedgerStorage(); + storage.initialize(conf, + LedgerManagerFactory.newLedgerManagerFactory(conf, zkc).newLedgerManager(), + dirManager, dirManager, cp, NullStatsLogger.INSTANCE); storage.start(); long startTime = MathUtils.now(); Thread.sleep(2000); @@ -424,8 +426,8 @@ public class CompactionTest extends BookKeeperClusterTestCase { File log0 = new File(curDir, "0.log"); LedgerDirsManager dirs = new LedgerDirsManager(conf, conf.getLedgerDirs()); assertFalse("Log shouldnt exist", log0.exists()); - InterleavedLedgerStorage storage = new InterleavedLedgerStorage(conf, manager, - dirs, checkpointSource); + InterleavedLedgerStorage storage = new InterleavedLedgerStorage(); + storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE); ledgers.add(1l); ledgers.add(2l); ledgers.add(3l); @@ -443,7 +445,8 @@ public class CompactionTest extends BookKeeperClusterTestCase { ledgers.remove(2l); ledgers.remove(3l); - storage = new InterleavedLedgerStorage(conf, manager, dirs, checkpointSource); + storage = new InterleavedLedgerStorage(); + storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE); storage.start(); for (int i = 0; i < 10; i++) { if (!log0.exists()) { @@ -458,7 +461,8 @@ public class CompactionTest extends BookKeeperClusterTestCase { storage.setMasterKey(4, KEY); storage.addEntry(genEntry(4, 1, ENTRY_SIZE)); // force ledger 1 page to flush - storage = new InterleavedLedgerStorage(conf, manager, dirs, checkpointSource); + storage = new InterleavedLedgerStorage(); + storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE); storage.getEntry(1, 1); // entry should exist } @@ -553,8 +557,8 @@ public class CompactionTest extends BookKeeperClusterTestCase { boolean compact) throws IOException { } }; - InterleavedLedgerStorage storage = new InterleavedLedgerStorage(conf, - manager, dirs, checkpointSource); + InterleavedLedgerStorage storage = new InterleavedLedgerStorage(); + storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE); double threshold = 0.1; // shouldn't throw exception @@ -599,11 +603,12 @@ public class CompactionTest extends BookKeeperClusterTestCase { for (File dir : dirManager.getAllLedgerDirs()) { Bookie.checkDirectoryStructure(dir); } - InterleavedLedgerStorage storage = new InterleavedLedgerStorage(conf, - LedgerManagerFactory.newLedgerManagerFactory(conf, zkc).newLedgerManager(), - dirManager, cp); + InterleavedLedgerStorage storage = new InterleavedLedgerStorage(); + storage.initialize(conf, + LedgerManagerFactory.newLedgerManagerFactory(conf, zkc).newLedgerManager(), + dirManager, dirManager, cp, NullStatsLogger.INSTANCE); storage.start(); - + // test suspend Major GC. Thread.sleep(conf.getMajorCompactionInterval() * 1000 + conf.getGcWaitTime()); @@ -613,7 +618,7 @@ public class CompactionTest extends BookKeeperClusterTestCase { Thread.sleep(conf.getMajorCompactionInterval() * 1000 + conf.getGcWaitTime()); assertTrue("major compaction triggered while set suspend", - storage.gcThread.lastMajorCompactionTime < startTime); + storage.gcThread.lastMajorCompactionTime < startTime); // test suspend Minor GC. storage.gcThread.suspendMinorGC(); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java index 19b6bcb..8812c90 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java @@ -321,7 +321,7 @@ public class LedgerCacheTest { .setLedgerDirNames(new String[] { ledgerDir.getPath() }) .setFlushInterval(1000) .setPageLimit(1) - .setSortedLedgerStorageEnabled(false); + .setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); Bookie b = new Bookie(conf); b.start(); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java index d947dff..49c7e42 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java @@ -35,6 +35,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; @@ -260,8 +262,20 @@ public class TestSyncThread { throws IOException { } } + private static class DummyLedgerStorage implements LedgerStorage { @Override + public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, + LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, + CheckpointSource checkpointSource, StatsLogger statsLogger) + throws IOException { + } + + @Override + public void deleteLedger(long ledgerId) throws IOException { + } + + @Override public void start() { } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java index de352b5..3875ec4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java @@ -22,11 +22,14 @@ package org.apache.bookkeeper.meta; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.NavigableMap; import java.util.Queue; import java.util.Random; import java.util.Set; @@ -36,11 +39,21 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; +import org.apache.bookkeeper.bookie.CompactableLedgerStorage; +import org.apache.bookkeeper.bookie.EntryLocation; +import org.apache.bookkeeper.bookie.CheckpointSource; +import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.bookie.EntryLogger; import org.apache.bookkeeper.bookie.GarbageCollector; +import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector; +import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerMetadata; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.jmx.BKMBeanInfo; import org.apache.bookkeeper.meta.LedgerManager.LedgerRange; import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; @@ -160,15 +173,24 @@ public class GcLedgersTest extends LedgerManagerTestCase { final CountDownLatch inGcProgress = new CountDownLatch(1); final CountDownLatch createLatch = new CountDownLatch(1); final CountDownLatch endLatch = new CountDownLatch(2); - final GarbageCollector garbageCollector = - new ScanAndCompareGarbageCollector(getLedgerManager(), activeLedgers); + final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage(); + final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(), + mockLedgerStorage); Thread gcThread = new Thread() { @Override public void run() { garbageCollector.gc(new GarbageCollector.GarbageCleaner() { boolean paused = false; + @Override public void clean(long ledgerId) { + try { + mockLedgerStorage.deleteLedger(ledgerId); + } catch (IOException e) { + e.printStackTrace(); + return; + } + if (!paused) { inGcProgress.countDown(); try { @@ -223,8 +245,8 @@ public class GcLedgersTest extends LedgerManagerTestCase { createLedgers(numLedgers, createdLedgers); - final GarbageCollector garbageCollector = - new ScanAndCompareGarbageCollector(getLedgerManager(), activeLedgers); + final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(), + new MockLedgerStorage()); GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() { @Override public void clean(long ledgerId) { @@ -259,8 +281,8 @@ public class GcLedgersTest extends LedgerManagerTestCase { createLedgers(numLedgers, createdLedgers); - final GarbageCollector garbageCollector = - new ScanAndCompareGarbageCollector(getLedgerManager(), activeLedgers); + final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(), + new MockLedgerStorage()); GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() { @Override public void clean(long ledgerId) { @@ -287,4 +309,97 @@ public class GcLedgersTest extends LedgerManagerTestCase { assertEquals("Should have cleaned something", 1, cleaned.size()); assertEquals("Should have cleaned first ledger" + first, (long)first, (long)cleaned.get(0)); } + + class MockLedgerStorage implements CompactableLedgerStorage { + + @Override + public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, + LedgerDirsManager ledgerDirsManager, + LedgerDirsManager indexDirsManager, + CheckpointSource checkpointSource, StatsLogger statsLogger) + throws IOException {} + + @Override + public void start() { + } + + @Override + public void shutdown() throws InterruptedException { + } + + @Override + public boolean ledgerExists(long ledgerId) throws IOException { + return false; + } + + @Override + public boolean setFenced(long ledgerId) throws IOException { + return false; + } + + @Override + public boolean isFenced(long ledgerId) throws IOException { + return false; + } + + @Override + public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException { + } + + @Override + public byte[] readMasterKey(long ledgerId) throws IOException, BookieException { + return null; + } + + @Override + public long addEntry(ByteBuffer entry) throws IOException { + return 0; + } + + @Override + public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException { + return null; + } + + @Override + public void flush() throws IOException { + } + + @Override + public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException { + return null; + } + + @Override + public void deleteLedger(long ledgerId) throws IOException { + activeLedgers.remove(ledgerId); + } + + @Override + public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) { + NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot(); + Map<Long, Boolean> subBkActiveLedgers = bkActiveLedgersSnapshot + .subMap(firstLedgerId, true, lastLedgerId, false); + + return subBkActiveLedgers.keySet(); + } + + @Override + public BKMBeanInfo getJMXBean() { + return null; + } + + @Override + public EntryLogger getEntryLogger() { + return null; + } + + @Override + public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException { + } + + @Override + public void flushEntriesLocationsIndex() throws IOException { + } + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ForceReadOnlyBookieTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ForceReadOnlyBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ForceReadOnlyBookieTest.java index 914c7e2..161802f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ForceReadOnlyBookieTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ForceReadOnlyBookieTest.java @@ -24,6 +24,7 @@ import java.io.File; import java.util.Enumeration; import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.InterleavedLedgerStorage; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerEntry; @@ -42,7 +43,7 @@ public class ForceReadOnlyBookieTest extends BookKeeperClusterTestCase { private final static Logger LOG = LoggerFactory.getLogger(ForceReadOnlyBookieTest.class); public ForceReadOnlyBookieTest() { super(2); - baseConf.setSortedLedgerStorageEnabled(false); + baseConf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); baseConf.setEntryLogFilePreAllocationEnabled(false); } @@ -65,11 +66,11 @@ public class ForceReadOnlyBookieTest extends BookKeeperClusterTestCase { bsConfs.get(1).setForceReadOnlyBookie(true); restartBookies(); Bookie bookie = bs.get(1).getBookie(); - + assertTrue("Bookie should be running and in readonly mode", bookie.isRunning() && bookie.isReadOnly()); LOG.info("successed force start ReadOnlyBookie"); - + // Check new bookie with readonly mode enabled. File[] ledgerDirs = bsConfs.get(1).getLedgerDirs(); assertEquals("Only one ledger dir should be present", 1, ledgerDirs.length); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java index d45a2f3..0c1a187 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java @@ -24,6 +24,8 @@ package org.apache.bookkeeper.test; import java.io.File; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.CountDownLatch; + +import org.apache.bookkeeper.bookie.InterleavedLedgerStorage; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; @@ -60,7 +62,7 @@ public class LedgerDeleteTest extends MultiLedgerManagerTestCase { baseConf.setEntryLogSizeLimit(2 * 1024 * 1024L); baseConf.setGcWaitTime(1000); baseConf.setEntryLogFilePreAllocationEnabled(false); - baseConf.setSortedLedgerStorageEnabled(false); + baseConf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); super.setUp(); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/96adbf1d/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java index 124a420..8cf5618 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java @@ -24,13 +24,13 @@ import java.io.File; import java.util.Enumeration; import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.InterleavedLedgerStorage; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.util.IOUtils; import org.junit.Test; import static org.junit.Assert.*; @@ -42,7 +42,7 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase { public ReadOnlyBookieTest() { super(2); - baseConf.setSortedLedgerStorageEnabled(false); + baseConf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName()); baseConf.setEntryLogFilePreAllocationEnabled(false); }
