Repository: bookkeeper Updated Branches: refs/heads/master 9bd9e061b -> 2c567d008
BOOKKEEPER-841: Bookie should calculate ledgers map writing a new entry log file sijie I've addressed all comments from https://reviews.apache.org/r/33061 Author: Matteo Merli <[email protected]> Reviewers: Sijie Guo <[email protected]> Closes #5 from merlimat/bk-841 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/2c567d00 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/2c567d00 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/2c567d00 Branch: refs/heads/master Commit: 2c567d008c644df5db8c441fb9aaf135ed36db95 Parents: 9bd9e06 Author: Matteo Merli <[email protected]> Authored: Tue Feb 23 15:44:31 2016 -0800 Committer: Matteo Merli <[email protected]> Committed: Tue Feb 23 15:44:31 2016 -0800 ---------------------------------------------------------------------- .../bookkeeper/bookie/EntryLogMetadata.java | 101 ++++++++ .../apache/bookkeeper/bookie/EntryLogger.java | 256 ++++++++++++++++++- .../bookie/GarbageCollectorThread.java | 110 +------- .../apache/bookkeeper/bookie/EntryLogTest.java | 100 +++++++- 4 files changed, 456 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/2c567d00/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java new file mode 100644 index 0000000..461736c --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadata.java @@ -0,0 +1,101 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Records the total size, remaining size and the set of ledgers that comprise a entry log. + */ +public class EntryLogMetadata { + private final long entryLogId; + private long totalSize; + private long remainingSize; + private ConcurrentHashMap<Long, Long> ledgersMap; + + public EntryLogMetadata(long logId) { + this.entryLogId = logId; + + totalSize = remainingSize = 0; + ledgersMap = new ConcurrentHashMap<Long, Long>(); + } + + public void addLedgerSize(long ledgerId, long size) { + totalSize += size; + remainingSize += size; + Long ledgerSize = ledgersMap.get(ledgerId); + if (null == ledgerSize) { + ledgerSize = 0L; + } + ledgerSize += size; + ledgersMap.put(ledgerId, ledgerSize); + } + + public void removeLedger(long ledgerId) { + Long size = ledgersMap.remove(ledgerId); + if (null == size) { + return; + } + remainingSize -= size; + } + + public boolean containsLedger(long ledgerId) { + return ledgersMap.containsKey(ledgerId); + } + + public double getUsage() { + if (totalSize == 0L) { + return 0.0f; + } + return (double) remainingSize / totalSize; + } + + public boolean isEmpty() { + return ledgersMap.isEmpty(); + } + + public long getEntryLogId() { + return entryLogId; + } + + public long getTotalSize() { + return totalSize; + } + + public long getRemainingSize() { + return remainingSize; + } + + Map<Long, Long> getLedgersMap() { + return ledgersMap; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{ totalSize = ").append(totalSize).append(", remainingSize = ").append(remainingSize) + .append(", ledgersMap = ").append(ledgersMap).append(" }"); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/2c567d00/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java index 1030c80..a970a96 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java @@ -39,9 +39,11 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; @@ -74,14 +76,25 @@ public class EntryLogger { private static class BufferedLogChannel extends BufferedChannel { final private long logId; + private final EntryLogMetadata entryLogMetada; + public BufferedLogChannel(FileChannel fc, int writeCapacity, int readCapacity, long logId) throws IOException { super(fc, writeCapacity, readCapacity); this.logId = logId; + this.entryLogMetada = new EntryLogMetadata(logId); } public long getLogId() { return logId; } + + public void registerWrittenEntry(long ledgerId, long entrySize) { + entryLogMetada.addLedgerSize(ledgerId, entrySize); + } + + public Map<Long, Long> getLedgersMap() { + return entryLogMetada.getLedgersMap(); + } } volatile File currentDir; @@ -101,14 +114,60 @@ public class EntryLogger { private final CopyOnWriteArrayList<EntryLogListener> listeners = new CopyOnWriteArrayList<EntryLogListener>(); + private static final int HEADER_V0 = 0; // Old log file format (no ledgers map index) + private static final int HEADER_V1 = 1; // Introduced ledger map index + private static final int HEADER_CURRENT_VERSION = HEADER_V1; + + private static class Header { + final int version; + final long ledgersMapOffset; + final int ledgersCount; + + Header(int version, long ledgersMapOffset, int ledgersCount) { + this.version = version; + this.ledgersMapOffset = ledgersMapOffset; + this.ledgersCount = ledgersCount; + } + } + /** * The 1K block at the head of the entry logger file - * that contains the fingerprint and (future) meta-data + * that contains the fingerprint and meta-data. + * + * Header is composed of: + * Fingerprint: 4 bytes "BKLO" + * Log file HeaderVersion enum: 4 bytes + * Ledger map offset: 8 bytes + * Ledgers Count: 4 bytes */ final static int LOGFILE_HEADER_SIZE = 1024; final ByteBuffer LOGFILE_HEADER = ByteBuffer.allocate(LOGFILE_HEADER_SIZE); + + final static int HEADER_VERSION_POSITION = 4; + final static int LEDGERS_MAP_OFFSET_POSITION = HEADER_VERSION_POSITION + 4; + + /** + * Ledgers map is composed of multiple parts that can be split into separated entries. Each of them is composed of: + * + * <pre> + * length: (4 bytes) [0-3] + * ledger id (-1): (8 bytes) [4 - 11] + * entry id: (8 bytes) [12-19] + * num ledgers stored in current metadata entry: (4 bytes) [20 - 23] + * ledger entries: sequence of (ledgerid, size) (8 + 8 bytes each) [24..] + * </pre> + */ + final static int LEDGERS_MAP_HEADER_SIZE = 4 + 8 + 8 + 4; + final static int LEDGERS_MAP_ENTRY_SIZE = 8 + 8; + + // Break the ledgers map into multiple batches, each of which can contain up to 10K ledgers + final static int LEDGERS_MAP_MAX_BATCH_SIZE = 10000; + final static long INVALID_LID = -1L; + // EntryId used to mark an entry (belonging to INVALID_ID) as a component of the serialized ledgers map + final static long LEDGERS_MAP_ENTRY_ID = -2L; + final static int MIN_SANE_ENTRY_SIZE = 8 + 8; final static long MB = 1024 * 1024; @@ -177,6 +236,7 @@ public class EntryLogger { // so there can be race conditions when entry logs are rolled over and // this header buffer is cleared before writing it into the new logChannel. LOGFILE_HEADER.put("BKLO".getBytes(UTF_8)); + LOGFILE_HEADER.putInt(HEADER_CURRENT_VERSION); // Find the largest logId long logId = INVALID_LID; @@ -370,9 +430,14 @@ public class EntryLogger { if (null == logChannelsToFlush) { logChannelsToFlush = new LinkedList<BufferedLogChannel>(); } + // flush the internal buffer back to filesystem but not sync disk // so the readers could access the data from filesystem. logChannel.flush(false); + + // Append ledgers map at the end of entry log + appendLedgersMap(logChannel); + BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog(); logChannelsToFlush.add(logChannel); LOG.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.", @@ -387,6 +452,55 @@ public class EntryLogger { } /** + * Append the ledger map at the end of the entry log. + * Updates the entry log file header with the offset and size of the map. + */ + private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException { + long ledgerMapOffset = entryLogChannel.position(); + + Map<Long, Long> ledgersMap = entryLogChannel.getLedgersMap(); + + Iterator<Entry<Long, Long>> iterator = ledgersMap.entrySet().iterator(); + int numberOfLedgers = ledgersMap.size(); + int remainingLedgers = numberOfLedgers; + + // Write the ledgers map into several batches + while (iterator.hasNext()) { + // Start new batch + int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE); + int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize; + ByteBuffer serializedMap = ByteBuffer.allocate(ledgerMapSize); + + serializedMap.putInt(ledgerMapSize - 4); + serializedMap.putLong(INVALID_LID); + serializedMap.putLong(LEDGERS_MAP_ENTRY_ID); + serializedMap.putInt(batchSize); + + // Dump all ledgers for this batch + for (int i = 0; i < batchSize; i++) { + Entry<Long, Long> entry = iterator.next(); + long ledgerId = entry.getKey(); + long size = entry.getValue(); + + serializedMap.putLong(ledgerId); + serializedMap.putLong(size); + --remainingLedgers; + } + + // Close current batch + serializedMap.flip(); + entryLogChannel.fileChannel.write(serializedMap); + } + + // Update the headers with the map offset and count of ledgers + ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4); + mapInfo.putLong(ledgerMapOffset); + mapInfo.putInt(numberOfLedgers); + mapInfo.flip(); + entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION); + } + + /** * An allocator pre-allocates entry log files. */ class EntryLoggerAllocator { @@ -642,12 +756,14 @@ public class EntryLogger { shouldCreateNewEntryLog.set(false); } } + ByteBuffer buff = ByteBuffer.allocate(4); buff.putInt(entry.remaining()); buff.flip(); logChannel.write(buff); long pos = logChannel.position(); logChannel.write(entry); + logChannel.registerWrittenEntry(ledger, entrySize); return (logChannel.getLogId() << 32L) | pos; } @@ -721,6 +837,30 @@ public class EntryLogger { return data; } + /** + * Read the header of an entry log + */ + private Header getHeaderForLogId(long entryLogId) throws IOException { + BufferedReadChannel bc = getChannelForLogId(entryLogId); + + // Allocate buffer to read (version, ledgersMapOffset, ledgerCount) + ByteBuffer headers = ByteBuffer.allocate(LOGFILE_HEADER_SIZE); + bc.read(headers, 0); + headers.flip(); + + // Skip marker string "BKLO" + headers.getInt(); + + int headerVersion = headers.getInt(); + if (headerVersion < HEADER_V0 || headerVersion > HEADER_CURRENT_VERSION) { + LOG.info("Unknown entry log header version for log {}: {}", entryLogId, headerVersion); + } + + long ledgersMapOffset = headers.getLong(); + int ledgersCount = headers.getInt(); + return new Header(headerVersion, ledgersMapOffset, ledgersCount); + } + private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException { BufferedReadChannel fc = getFromChannels(entryLogId); if (fc != null) { @@ -788,6 +928,7 @@ public class EntryLogger { // Start the read position in the current entry log file to be after // the header where all of the ledger entries are. long pos = LOGFILE_HEADER_SIZE; + // Read through the entry log file and extract the ledger ID's. while (true) { // Check if we've finished reading the entry log file. @@ -812,7 +953,7 @@ public class EntryLogger { lidBuff.flip(); long lid = lidBuff.getLong(); lidBuff.clear(); - if (!scanner.accept(lid)) { + if (lid == INVALID_LID || !scanner.accept(lid)) { // skip this entry pos += entrySize; continue; @@ -834,6 +975,117 @@ public class EntryLogger { } } + public EntryLogMetadata getEntryLogMetadata(long entryLogId) throws IOException { + // First try to extract the EntryLogMetada from the index, if there's no index then fallback to scanning the + // entry log + try { + return extractEntryLogMetadataFromIndex(entryLogId); + } catch (Exception e) { + LOG.info("Failed to get ledgers map index from: {}.log : {}", entryLogId, e.getMessage()); + + // Fall-back to scanning + return extractEntryLogMetadataByScanning(entryLogId); + } + } + + EntryLogMetadata extractEntryLogMetadataFromIndex(long entryLogId) throws IOException { + Header header = getHeaderForLogId(entryLogId); + + if (header.version < HEADER_V1) { + throw new IOException("Old log file header without ledgers map on entryLogId " + entryLogId); + } + + if (header.ledgersMapOffset == 0L) { + // The index was not stored in the log file (possibly because the bookie crashed before flushing it) + throw new IOException("No ledgers map index found on entryLogId" + entryLogId); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering ledgers maps for log {} at offset: {}", entryLogId, header.ledgersMapOffset); + } + + BufferedReadChannel bc = getChannelForLogId(entryLogId); + + // There can be multiple entries containing the various components of the serialized ledgers map + long offset = header.ledgersMapOffset; + EntryLogMetadata meta = new EntryLogMetadata(entryLogId); + + while (offset < bc.size()) { + // Read ledgers map size + ByteBuffer sizeBuf = ByteBuffer.allocate(4); + bc.read(sizeBuf, offset); + sizeBuf.flip(); + + int ledgersMapSize = sizeBuf.getInt(); + + // Read the index into a buffer + ByteBuffer ledgersMapBuffer = ByteBuffer.allocate(ledgersMapSize); + bc.read(ledgersMapBuffer, offset + 4); + ledgersMapBuffer.flip(); + + // Discard ledgerId and entryId + long lid = ledgersMapBuffer.getLong(); + if (lid != INVALID_LID) { + throw new IOException("Cannot deserialize ledgers map from ledger " + lid + " -- entryLogId: " + entryLogId); + } + + long entryId = ledgersMapBuffer.getLong(); + if (entryId != LEDGERS_MAP_ENTRY_ID) { + throw new IOException("Cannot deserialize ledgers map from ledger " + lid + ":" + entryId + " -- entryLogId: " + entryLogId); + } + + // Read the number of ledgers in the current entry batch + int ledgersCount = ledgersMapBuffer.getInt(); + + // Extract all (ledger,size) tuples from buffer + for (int i = 0; i < ledgersCount; i++) { + long ledgerId = ledgersMapBuffer.getLong(); + long size = ledgersMapBuffer.getLong(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering ledgers maps for log {} -- Found ledger: {} with size: {}", + new Object[] { entryLogId, ledgerId, size }); + } + meta.addLedgerSize(ledgerId, size); + } + + if (ledgersMapBuffer.hasRemaining()) { + throw new IOException("Invalid entry size when reading ledgers map on entryLogId: " + entryLogId); + } + + // Move to next entry, if any + offset += ledgersMapSize + 4; + } + + if (meta.getLedgersMap().size() != header.ledgersCount) { + throw new IOException("Not all ledgers were found in ledgers map index. expected: " + header.ledgersCount + + " -- found: " + meta.getLedgersMap().size() + " -- entryLogId: " + entryLogId); + } + + return meta; + } + + private EntryLogMetadata extractEntryLogMetadataByScanning(long entryLogId) throws IOException { + final EntryLogMetadata meta = new EntryLogMetadata(entryLogId); + + // Read through the entry log file and extract the entry log meta + scanEntryLog(entryLogId, new EntryLogScanner() { + @Override + public void process(long ledgerId, long offset, ByteBuffer entry) throws IOException { + // add new entry size of a ledger to entry log meta + meta.addLedgerSize(ledgerId, entry.limit() + 4); + } + + @Override + public boolean accept(long ledgerId) { + return true; + } + }); + + LOG.debug("Retrieved entry log meta data entryLogId: {}, meta: {}", entryLogId, meta); + return meta; + } + /** * Shutdown method to gracefully stop entry logger. */ http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/2c567d00/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 1ca43e0..1c9c7e7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -426,7 +426,7 @@ public class GarbageCollectorThread extends BookieThread { // Loop through all of the entry logs and remove the non-active ledgers. for (Long entryLogId : entryLogMetaMap.keySet()) { EntryLogMetadata meta = entryLogMetaMap.get(entryLogId); - for (Long entryLogLedger : meta.ledgersMap.keySet()) { + for (Long entryLogLedger : meta.getLedgersMap().keySet()) { // Remove the entry log ledger from the set if it isn't active. if (!activeLedgers.containsKey(entryLogLedger)) { meta.removeLedger(entryLogLedger); @@ -457,8 +457,8 @@ public class GarbageCollectorThread extends BookieThread { Comparator<EntryLogMetadata> sizeComparator = new Comparator<EntryLogMetadata>() { @Override public int compare(EntryLogMetadata m1, EntryLogMetadata m2) { - long unusedSize1 = m1.totalSize - m1.remainingSize; - long unusedSize2 = m2.totalSize - m2.remainingSize; + long unusedSize1 = m1.getTotalSize() - m1.getRemainingSize(); + long unusedSize2 = m2.getTotalSize() - m2.getRemainingSize(); if (unusedSize1 > unusedSize2) { return -1; } else if (unusedSize1 < unusedSize2) { @@ -477,10 +477,13 @@ public class GarbageCollectorThread extends BookieThread { if (meta.getUsage() >= threshold) { break; } - LOG.debug("Compacting entry log {} below threshold {}.", meta.entryLogId, threshold); + + if (LOG.isDebugEnabled()) { + LOG.debug("Compacting entry log {} below threshold {}.", meta.getEntryLogId(), threshold); + } try { compactEntryLog(scannerFactory, meta); - toRemove.add(meta.entryLogId); + toRemove.add(meta.getEntryLogId()); } catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) { LOG.warn("No writable ledger directory available, aborting compaction", nwlde); break; @@ -557,10 +560,10 @@ public class GarbageCollectorThread extends BookieThread { return; } - LOG.info("Compacting entry log : {}", entryLogMeta.entryLogId); + LOG.info("Compacting entry log : {}", entryLogMeta.getEntryLogId()); try { - entryLogger.scanEntryLog(entryLogMeta.entryLogId, + entryLogger.scanEntryLog(entryLogMeta.getEntryLogId(), scannerFactory.newScanner(entryLogMeta)); } finally { // clear compacting flag @@ -569,86 +572,6 @@ public class GarbageCollectorThread extends BookieThread { } /** - * Records the total size, remaining size and the set of ledgers that comprise a entry log. - */ - static class EntryLogMetadata { - long entryLogId; - long totalSize; - long remainingSize; - ConcurrentHashMap<Long, Long> ledgersMap; - - public EntryLogMetadata(long logId) { - this.entryLogId = logId; - - totalSize = remainingSize = 0; - ledgersMap = new ConcurrentHashMap<Long, Long>(); - } - - public void addLedgerSize(long ledgerId, long size) { - totalSize += size; - remainingSize += size; - Long ledgerSize = ledgersMap.get(ledgerId); - if (null == ledgerSize) { - ledgerSize = 0L; - } - ledgerSize += size; - ledgersMap.put(ledgerId, ledgerSize); - } - - public void removeLedger(long ledgerId) { - Long size = ledgersMap.remove(ledgerId); - if (null == size) { - return; - } - remainingSize -= size; - } - - public boolean containsLedger(long ledgerId) { - return ledgersMap.containsKey(ledgerId); - } - - public double getUsage() { - if (totalSize == 0L) { - return 0.0f; - } - return (double)remainingSize / totalSize; - } - - public boolean isEmpty() { - return ledgersMap.isEmpty(); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("{ totalSize = ").append(totalSize).append(", remainingSize = ") - .append(remainingSize).append(", ledgersMap = ").append(ledgersMap).append(" }"); - return sb.toString(); - } - } - - /** - * A scanner used to extract entry log meta from entry log files. - */ - static class ExtractionScanner implements EntryLogScanner { - EntryLogMetadata meta; - - public ExtractionScanner(EntryLogMetadata meta) { - this.meta = meta; - } - - @Override - public boolean accept(long ledgerId) { - return true; - } - @Override - public void process(long ledgerId, long offset, ByteBuffer entry) { - // add new entry size of a ledger to entry log meta - meta.addLedgerSize(ledgerId, entry.limit() + 4); - } - } - - /** * Method to read in all of the entry logs (those that we haven't done so yet), * and find the set of ledger ID's that make up each entry log file. * @@ -678,7 +601,7 @@ public class GarbageCollectorThread extends BookieThread { try { // Read through the entry log file and extract the entry log meta - EntryLogMetadata entryLogMeta = extractMetaFromEntryLog(entryLogger, entryLogId); + EntryLogMetadata entryLogMeta = entryLogger.getEntryLogMetadata(entryLogId); entryLogMetaMap.put(entryLogId, entryLogMeta); } catch (IOException e) { hasExceptionWhenScan = true; @@ -695,15 +618,4 @@ public class GarbageCollectorThread extends BookieThread { } return entryLogMetaMap; } - - static EntryLogMetadata extractMetaFromEntryLog(EntryLogger entryLogger, long entryLogId) - throws IOException { - EntryLogMetadata entryLogMeta = new EntryLogMetadata(entryLogId); - ExtractionScanner scanner = new ExtractionScanner(entryLogMeta); - // Read through the entry log file and extract the entry log meta - entryLogger.scanEntryLog(entryLogId, scanner); - LOG.debug("Retrieved entry log meta data entryLogId: {}, meta: {}", - entryLogId, entryLogMeta); - return entryLogMeta; - } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/2c567d00/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java index 6b0ecd8..4e1004c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java @@ -28,8 +28,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import org.apache.bookkeeper.bookie.GarbageCollectorThread.EntryLogMetadata; -import org.apache.bookkeeper.bookie.GarbageCollectorThread.ExtractionScanner; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.util.IOUtils; @@ -86,15 +84,11 @@ public class EntryLogTest { // now see which ledgers are in the log logger = new EntryLogger(conf, bookie.getLedgerDirsManager()); - EntryLogMetadata meta = new EntryLogMetadata(0L); - ExtractionScanner scanner = new ExtractionScanner(meta); - - logger.scanEntryLog(0L, scanner); - + EntryLogMetadata meta = logger.getEntryLogMetadata(0L); LOG.info("Extracted Meta From Entry Log {}", meta); - assertNotNull(meta.ledgersMap.get(1L)); - assertNull(meta.ledgersMap.get(2L)); - assertNotNull(meta.ledgersMap.get(3L)); + assertNotNull(meta.getLedgersMap().get(1L)); + assertNull(meta.getLedgersMap().get(2L)); + assertNotNull(meta.getLedgersMap().get(3L)); } private ByteBuffer generateEntry(long ledger, long entry) { @@ -220,4 +214,90 @@ public class EntryLogTest { Assert.assertTrue(0 == generateEntry(3, 1).compareTo(ledgerStorage.getEntry(3, 1))); } + /** + * Explicitely try to recover using the ledgers map index at the end of the entry log + */ + @Test(timeout=60000) + public void testRecoverFromLedgersMap() throws Exception { + File tmpDir = createTempDir("bkTest", ".dir"); + File curDir = Bookie.getCurrentDirectory(tmpDir); + Bookie.checkDirectoryStructure(curDir); + + int gcWaitTime = 1000; + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + conf.setGcWaitTime(gcWaitTime); + conf.setLedgerDirNames(new String[] {tmpDir.toString()}); + Bookie bookie = new Bookie(conf); + + // create some entries + EntryLogger logger = ((InterleavedLedgerStorage)bookie.ledgerStorage).entryLogger; + logger.addEntry(1, generateEntry(1, 1)); + logger.addEntry(3, generateEntry(3, 1)); + logger.addEntry(2, generateEntry(2, 1)); + logger.addEntry(1, generateEntry(1, 2)); + logger.rollLog(); + logger.flushRotatedLogs(); + + EntryLogMetadata meta = logger.extractEntryLogMetadataFromIndex(0L); + LOG.info("Extracted Meta From Entry Log {}", meta); + assertEquals(60, meta.getLedgersMap().get(1L).longValue()); + assertEquals(30, meta.getLedgersMap().get(2L).longValue()); + assertEquals(30, meta.getLedgersMap().get(3L).longValue()); + assertNull(meta.getLedgersMap().get(4L)); + assertEquals(120, meta.getTotalSize()); + assertEquals(120, meta.getRemainingSize()); + } + + /** + * Explicitely try to recover using the ledgers map index at the end of the entry log + */ + @Test(timeout = 60000) + public void testRecoverFromLedgersMapOnV0EntryLog() throws Exception { + File tmpDir = createTempDir("bkTest", ".dir"); + File curDir = Bookie.getCurrentDirectory(tmpDir); + Bookie.checkDirectoryStructure(curDir); + + int gcWaitTime = 1000; + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + conf.setGcWaitTime(gcWaitTime); + conf.setLedgerDirNames(new String[] { tmpDir.toString() }); + Bookie bookie = new Bookie(conf); + + // create some entries + EntryLogger logger = ((InterleavedLedgerStorage) bookie.ledgerStorage).entryLogger; + logger.addEntry(1, generateEntry(1, 1)); + logger.addEntry(3, generateEntry(3, 1)); + logger.addEntry(2, generateEntry(2, 1)); + logger.addEntry(1, generateEntry(1, 2)); + logger.rollLog(); + + // Rewrite the entry log header to be on V0 format + File f = new File(curDir, "0.log"); + RandomAccessFile raf = new RandomAccessFile(f, "rw"); + raf.seek(EntryLogger.HEADER_VERSION_POSITION); + // Write zeros to indicate V0 + no ledgers map info + raf.write(new byte[4 + 8]); + raf.close(); + + // now see which ledgers are in the log + logger = new EntryLogger(conf, bookie.getLedgerDirsManager()); + + try { + logger.extractEntryLogMetadataFromIndex(0L); + fail("Should not be possible to recover from ledgers map index"); + } catch (IOException e) { + // Ok + } + + // Public method should succeed by falling back to scanning the file + EntryLogMetadata meta = logger.getEntryLogMetadata(0L); + LOG.info("Extracted Meta From Entry Log {}", meta); + assertEquals(60, meta.getLedgersMap().get(1L).longValue()); + assertEquals(30, meta.getLedgersMap().get(2L).longValue()); + assertEquals(30, meta.getLedgersMap().get(3L).longValue()); + assertNull(meta.getLedgersMap().get(4L)); + assertEquals(120, meta.getTotalSize()); + assertEquals(120, meta.getRemainingSize()); + } + }
