Repository: bookkeeper Updated Branches: refs/heads/master 91db1254c -> 123eccd43
BOOKKEEPER-1009: Use multiple journals in bookie Mostly https://github.com/apache/bookkeeper/pull/71 with comments addressed Will shortly add tests. merlimat Would it be ok to close 71? Author: Govind Menon <[email protected]> Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo <[email protected]> Closes #115 from govind-menon/BOOKKEEPER-1009_B Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/123eccd4 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/123eccd4 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/123eccd4 Branch: refs/heads/master Commit: 123eccd435a4a96a9147ed4a24efbe9025fe79ba Parents: 91db125 Author: Govind Menon <[email protected]> Authored: Wed Mar 22 14:54:46 2017 -0700 Committer: Sijie Guo <[email protected]> Committed: Wed Mar 22 14:54:46 2017 -0700 ---------------------------------------------------------------------- .../org/apache/bookkeeper/bookie/Bookie.java | 289 ++++++++++++------- .../apache/bookkeeper/bookie/BookieBean.java | 7 +- .../apache/bookkeeper/bookie/BookieShell.java | 83 ++++-- .../bookkeeper/bookie/CheckpointSourceList.java | 97 +++++++ .../org/apache/bookkeeper/bookie/Cookie.java | 33 ++- .../bookkeeper/bookie/FileSystemUpgrade.java | 3 +- .../org/apache/bookkeeper/bookie/Journal.java | 13 +- .../bookkeeper/conf/ServerConfiguration.java | 46 ++- .../apache/bookkeeper/proto/BookieServer.java | 2 +- .../bookkeeper/bookie/BookieAccessor.java | 5 +- .../bookkeeper/bookie/CompactionTest.java | 9 +- .../bookkeeper/bookie/UpdateCookieCmdTest.java | 8 +- 12 files changed, 433 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/123eccd4/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index bbbfa51..b3e0ed3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -30,10 +30,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -44,7 +41,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.bookkeeper.bookie.Journal.JournalScanner; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; @@ -107,14 +106,14 @@ public class Bookie extends BookieCriticalThread { private final static Logger LOG = LoggerFactory.getLogger(Bookie.class); - final File journalDirectory; + final List<File> journalDirectories; final ServerConfiguration conf; final SyncThread syncThread; final LedgerManagerFactory ledgerManagerFactory; final LedgerManager ledgerManager; final LedgerStorage ledgerStorage; - final Journal journal; + final List<Journal> journals; final HandleFactory handles; @@ -323,51 +322,70 @@ public class Bookie extends BookieCriticalThread { allLedgerDirs.addAll(indexDirsManager.getAllLedgerDirs()); } if (zk == null) { // exists only for testing, just make sure directories are correct - checkDirectoryStructure(journalDirectory); + + for (File journalDirectory : journalDirectories) { + checkDirectoryStructure(journalDirectory); + } + for (File dir : allLedgerDirs) { checkDirectoryStructure(dir); } return; } + if (conf.getAllowStorageExpansion()) { - checkEnvironmentWithStorageExpansion(conf, zk, journalDirectory, allLedgerDirs); + checkEnvironmentWithStorageExpansion(conf, zk, journalDirectories, allLedgerDirs); return; } + try { boolean newEnv = false; List<File> missedCookieDirs = new ArrayList<File>(); - Cookie journalCookie = null; + List<Cookie> journalCookies = Lists.newArrayList(); // try to read cookie from journal directory. - try { - journalCookie = Cookie.readFromDirectory(journalDirectory); - if (journalCookie.isBookieHostCreatedFromIp()) { - conf.setUseHostNameAsBookieID(false); - } else { - conf.setUseHostNameAsBookieID(true); + for (File journalDirectory : journalDirectories) { + try { + Cookie journalCookie = Cookie.readFromDirectory(journalDirectory); + journalCookies.add(journalCookie); + if (journalCookie.isBookieHostCreatedFromIp()) { + conf.setUseHostNameAsBookieID(false); + } else { + conf.setUseHostNameAsBookieID(true); + } + } catch (FileNotFoundException fnf) { + newEnv = true; + missedCookieDirs.add(journalDirectory); } - } catch (FileNotFoundException fnf) { - newEnv = true; - missedCookieDirs.add(journalDirectory); } + String instanceId = getInstanceId(conf, zk); Cookie.Builder builder = Cookie.generateCookie(conf); if (null != instanceId) { builder.setInstanceId(instanceId); } Cookie masterCookie = builder.build(); + Versioned<Cookie> zkCookie = null; try { - Versioned<Cookie> zkCookie = Cookie.readFromZooKeeper(zk, conf); - masterCookie.verify(zkCookie.getValue()); + zkCookie = Cookie.readFromZooKeeper(zk, conf); + // If allowStorageExpansion option is set, we should + // make sure that the new set of ledger/index dirs + // is a super set of the old; else, we fail the cookie check + masterCookie.verifyIsSuperSet(zkCookie.getValue()); } catch (KeeperException.NoNodeException nne) { // can occur in cases: // 1) new environment or // 2) done only metadata format and started bookie server. } - checkDirectoryStructure(journalDirectory); - + for (File journalDirectory : journalDirectories) { + checkDirectoryStructure(journalDirectory); + } if(!newEnv){ - masterCookie.verify(journalCookie); + for(Cookie journalCookie: journalCookies) { + masterCookie.verify(journalCookie); + } } + + for (File dir : allLedgerDirs) { checkDirectoryStructure(dir); try { @@ -379,20 +397,46 @@ public class Bookie extends BookieCriticalThread { } if (!newEnv && missedCookieDirs.size() > 0) { - LOG.error("Cookie exists in zookeeper, but not in all local directories. " - + " Directories missing cookie file are " + missedCookieDirs); - throw new BookieException.InvalidCookieException(); + // If we find that any of the dirs in missedCookieDirs, existed + // previously, we stop because we could be missing data + // Also, if a new ledger dir is being added, we make sure that + // that dir is empty. Else, we reject the request + Set<String> existingLedgerDirs = Sets.newHashSet(); + for(Cookie journalCookie : journalCookies) { + Collections.addAll(existingLedgerDirs, journalCookie.getLedgerDirPathsFromCookie()); + } + List<File> dirsMissingData = new ArrayList<File>(); + List<File> nonEmptyDirs = new ArrayList<File>(); + for (File dir : missedCookieDirs) { + if (existingLedgerDirs.contains(dir.getParent())) { + // if one of the existing ledger dirs doesn't have cookie, + // let us not proceed further + dirsMissingData.add(dir); + continue; + } + String[] content = dir.list(); + if (content != null && content.length != 0) { + nonEmptyDirs.add(dir); + } + } + if (dirsMissingData.size() > 0 || nonEmptyDirs.size() > 0) { + LOG.error("Either not all local directories have cookies or directories being added " + + " newly are not empty. " + + "Directories missing cookie file are: " + dirsMissingData + + " New directories that are not empty are: " + nonEmptyDirs); + throw new BookieException.InvalidCookieException(); + } } - if (newEnv) { - if (missedCookieDirs.size() > 0) { - LOG.info("Directories missing cookie file are {}", missedCookieDirs); + if (missedCookieDirs.size() > 0) { + LOG.info("Stamping new cookies on all dirs {}", missedCookieDirs); + for (File journalDirectory : journalDirectories) { masterCookie.writeToDirectory(journalDirectory); - for (File dir : allLedgerDirs) { - masterCookie.writeToDirectory(dir); - } } - masterCookie.writeToZooKeeper(zk, conf, Version.NEW); + for (File dir : allLedgerDirs) { + masterCookie.writeToDirectory(dir); + } + masterCookie.writeToZooKeeper(zk, conf, zkCookie != null ? zkCookie.getVersion() : Version.NEW); } } catch (KeeperException ke) { LOG.error("Couldn't access cookie in zookeeper", ke); @@ -410,23 +454,27 @@ public class Bookie extends BookieCriticalThread { } public static void checkEnvironmentWithStorageExpansion(ServerConfiguration conf, - ZooKeeper zk, File journalDirectory, List<File> allLedgerDirs) throws BookieException, IOException { + ZooKeeper zk, List<File> journalDirectories, List<File> allLedgerDirs) throws BookieException, IOException { try { boolean newEnv = false; List<File> missedCookieDirs = new ArrayList<File>(); - Cookie journalCookie = null; + List<Cookie> journalCookies = Lists.newArrayList(); // try to read cookie from journal directory. - try { - journalCookie = Cookie.readFromDirectory(journalDirectory); - if (journalCookie.isBookieHostCreatedFromIp()) { - conf.setUseHostNameAsBookieID(false); - } else { - conf.setUseHostNameAsBookieID(true); + for (File journalDirectory : journalDirectories) { + try { + Cookie journalCookie = Cookie.readFromDirectory(journalDirectory); + journalCookies.add(journalCookie); + if (journalCookie.isBookieHostCreatedFromIp()) { + conf.setUseHostNameAsBookieID(false); + } else { + conf.setUseHostNameAsBookieID(true); + } + } catch (FileNotFoundException fnf) { + newEnv = true; + missedCookieDirs.add(journalDirectory); } - } catch (FileNotFoundException fnf) { - newEnv = true; - missedCookieDirs.add(journalDirectory); } + String instanceId = getInstanceId(conf, zk); Cookie.Builder builder = Cookie.generateCookie(conf); if (null != instanceId) { @@ -445,12 +493,16 @@ public class Bookie extends BookieCriticalThread { // 1) new environment or // 2) done only metadata format and started bookie server. } - checkDirectoryStructure(journalDirectory); - + for (File journalDirectory : journalDirectories) { + checkDirectoryStructure(journalDirectory); + } if(!newEnv){ - masterCookie.verifyIsSuperSet(journalCookie); + for(Cookie journalCookie: journalCookies) { + masterCookie.verifyIsSuperSet(journalCookie); + } } + for (File dir : allLedgerDirs) { checkDirectoryStructure(dir); try { @@ -466,7 +518,10 @@ public class Bookie extends BookieCriticalThread { // previously, we stop because we could be missing data // Also, if a new ledger dir is being added, we make sure that // that dir is empty. Else, we reject the request - Set<String> existingLedgerDirs = Sets.newHashSet(journalCookie.getLedgerDirPathsFromCookie()); + Set<String> existingLedgerDirs = Sets.newHashSet(); + for(Cookie journalCookie : journalCookies) { + Collections.addAll(existingLedgerDirs, journalCookie.getLedgerDirPathsFromCookie()); + } List<File> dirsMissingData = new ArrayList<File>(); List<File> nonEmptyDirs = new ArrayList<File>(); for (File dir : missedCookieDirs) { @@ -492,7 +547,9 @@ public class Bookie extends BookieCriticalThread { if (missedCookieDirs.size() > 0) { LOG.info("Stamping new cookies on all dirs {}", missedCookieDirs); - masterCookie.writeToDirectory(journalDirectory); + for (File journalDirectory : journalDirectories) { + masterCookie.writeToDirectory(journalDirectory); + } for (File dir : allLedgerDirs) { masterCookie.writeToDirectory(dir); } @@ -589,7 +646,11 @@ public class Bookie extends BookieCriticalThread { this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + BookKeeperConstants.READONLY; this.conf = conf; - this.journalDirectory = getCurrentDirectory(conf.getJournalDir()); + this.journalDirectories = Lists.newArrayList(); + for (File journalDirectory : conf.getJournalDirs()) { + this.journalDirectories.add(getCurrentDirectory(journalDirectory)); + } + this.ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), statsLogger.scope(LD_LEDGER_SCOPE)); File[] idxDirs = conf.getIndexDirs(); @@ -611,16 +672,22 @@ public class Bookie extends BookieCriticalThread { // configured directories. When disk errors or all the ledger // directories are full, would throws exception and fail bookie startup. this.ledgerDirsManager.init(); - // instantiate the journal - journal = new Journal(conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE)); + // instantiate the journals + journals = Lists.newArrayList(); + for(int i=0 ;i<journalDirectories.size();i++) { + journals.add(new Journal(journalDirectories.get(i), + conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE + "_" + i))); + } + + CheckpointSource checkpointSource = new CheckpointSourceList(journals); // Instantiate the ledger storage implementation String ledgerStorageClass = conf.getLedgerStorageClass(); LOG.info("Using ledger storage: {}", ledgerStorageClass); ledgerStorage = LedgerStorageFactory.createLedgerStorage(ledgerStorageClass); - ledgerStorage.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager, journal, statsLogger); + ledgerStorage.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager, checkpointSource, statsLogger); syncThread = new SyncThread(conf, getLedgerDirsListener(), - ledgerStorage, journal); + ledgerStorage, checkpointSource); handles = new HandleFactoryImpl(ledgerStorage); @@ -657,7 +724,7 @@ public class Bookie extends BookieCriticalThread { void readJournal() throws IOException, BookieException { long startTs = MathUtils.now(); - journal.replay(new JournalScanner() { + JournalScanner scanner = new JournalScanner() { @Override public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException { long ledgerId = recBuff.getLong(); @@ -705,7 +772,11 @@ public class Bookie extends BookieCriticalThread { throw new IOException(be); } } - }); + }; + + for (Journal journal : journals) { + journal.replay(scanner); + } long elapsedTs = MathUtils.now() - startTs; LOG.info("Finished replaying journal in {} ms.", elapsedTs); } @@ -713,7 +784,8 @@ public class Bookie extends BookieCriticalThread { @Override synchronized public void start() { setDaemon(true); - LOG.debug("I'm starting a bookie with journal directory {}", journalDirectory.getName()); + LOG.debug("I'm starting a bookie with journal directories {}", + journalDirectories.stream().map(File::getName).collect(Collectors.joining(", "))); //Start DiskChecker thread ledgerDirsManager.start(); if (indexDirsManager != ledgerDirsManager) { @@ -1155,11 +1227,17 @@ public class Bookie extends BookieCriticalThread { public void run() { // bookie thread wait for journal thread try { - // start journal - journal.start(); + // start journals + for (Journal journal: journals) { + journal.start(); + } + // wait until journal quits - journal.join(); - LOG.info("Journal thread quits."); + for (Journal journal: journals) { + + journal.join(); + } + LOG.info("Journal thread(s) quit."); } catch (InterruptedException ie) { LOG.warn("Interrupted on running journal thread : ", ie); } @@ -1215,8 +1293,10 @@ public class Bookie extends BookieCriticalThread { // Shutdown the state service stateService.shutdown(); - // Shutdown journal - journal.shutdown(); + // Shutdown journals + for (Journal journal : journals) { + journal.shutdown(); + } this.join(); syncThread.shutdown(); @@ -1271,12 +1351,16 @@ public class Bookie extends BookieCriticalThread { bb.flip(); if (null == masterKeyCache.putIfAbsent(ledgerId, masterKey)) { - journal.logAddEntry(bb, new NopWriteCallback(), null); + getJournal(ledgerId).logAddEntry(bb, new NopWriteCallback(), null); } } return l; } + private Journal getJournal(long ledgerId) { + return journals.get(MathUtils.signSafeMod(ledgerId, journals.size())); + } + /** * Add an entry to a ledger as specified by handle. */ @@ -1290,7 +1374,7 @@ public class Bookie extends BookieCriticalThread { writeBytes.add(entry.remaining()); LOG.trace("Adding {}@{}", entryId, ledgerId); - journal.logAddEntry(entry, cb, ctx); + getJournal(ledgerId).logAddEntry(entry, cb, ctx); } /** @@ -1407,7 +1491,7 @@ public class Bookie extends BookieCriticalThread { FutureWriteCallback fwc = new FutureWriteCallback(); LOG.debug("record fenced state for ledger {} in journal.", ledgerId); - journal.logAddEntry(bb, fwc, null); + getJournal(ledgerId).logAddEntry(bb, fwc, null); return fwc.getResult(); } else { // already fenced @@ -1483,56 +1567,57 @@ public class Bookie extends BookieCriticalThread { */ public static boolean format(ServerConfiguration conf, boolean isInteractive, boolean force) { - File journalDir = conf.getJournalDir(); - String[] journalDirFiles = - journalDir.exists() && journalDir.isDirectory() ? journalDir.list() : null; - if (journalDirFiles != null && journalDirFiles.length != 0) { - try { - boolean confirm = false; - if (!isInteractive) { - // If non interactive and force is set, then delete old - // data. - if (force) { - confirm = true; + for (File journalDir : conf.getJournalDirs()) { + String[] journalDirFiles = + journalDir.exists() && journalDir.isDirectory() ? journalDir.list() : null; + if (journalDirFiles != null && journalDirFiles.length != 0) { + try { + boolean confirm = false; + if (!isInteractive) { + // If non interactive and force is set, then delete old + // data. + if (force) { + confirm = true; + } else { + confirm = false; + } } else { - confirm = false; + confirm = IOUtils + .confirmPrompt("Are you sure to format Bookie data..?"); } - } else { - confirm = IOUtils - .confirmPrompt("Are you sure to format Bookie data..?"); - } - if (!confirm) { - LOG.error("Bookie format aborted!!"); + if (!confirm) { + LOG.error("Bookie format aborted!!"); + return false; + } + } catch (IOException e) { + LOG.error("Error during bookie format", e); return false; } - } catch (IOException e) { - LOG.error("Error during bookie format", e); - return false; } - } - if (!cleanDir(journalDir)) { - LOG.error("Formatting journal directory failed"); - return false; - } - - File[] ledgerDirs = conf.getLedgerDirs(); - for (File dir : ledgerDirs) { - if (!cleanDir(dir)) { - LOG.error("Formatting ledger directory " + dir + " failed"); + if (!cleanDir(journalDir)) { + LOG.error("Formatting journal directory failed"); return false; } - } - // Clean up index directories if they are separate from the ledger dirs - File[] indexDirs = conf.getIndexDirs(); - if (null != indexDirs) { - for (File dir : indexDirs) { + File[] ledgerDirs = conf.getLedgerDirs(); + for (File dir : ledgerDirs) { if (!cleanDir(dir)) { LOG.error("Formatting ledger directory " + dir + " failed"); return false; } } + + // Clean up index directories if they are separate from the ledger dirs + File[] indexDirs = conf.getIndexDirs(); + if (null != indexDirs) { + for (File dir : indexDirs) { + if (!cleanDir(dir)) { + LOG.error("Formatting ledger directory " + dir + " failed"); + return false; + } + } + } } LOG.info("Bookie format completed successfully"); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/123eccd4/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java index 9ad7c9c..67d0129 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieBean.java @@ -45,7 +45,12 @@ public class BookieBean implements BookieMXBean, BKMBeanInfo { @Override public int getQueueLength() { - return bk.journal.getJournalQueueLength(); + + int totalLength = 0; + for (Journal journal : bk.journals) { + totalLength += journal.getJournalQueueLength(); + } + return totalLength; } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/123eccd4/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index b4030f1..9afc5d7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException; import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; import org.apache.bookkeeper.bookie.Journal.JournalScanner; +import org.apache.bookkeeper.bookie.Journal; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -93,6 +94,8 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.util.concurrent.AbstractFuture; +import javax.sql.rowset.serial.SerialRef; + /** * Bookie Shell is to provide utilities for users to administer a bookkeeper cluster. */ @@ -128,10 +131,10 @@ public class BookieShell implements Tool { final ServerConfiguration bkConf = new ServerConfiguration(); File[] indexDirectories; File[] ledgerDirectories; - File journalDirectory; + File[] journalDirectories; EntryLogger entryLogger = null; - Journal journal = null; + List<Journal> journals = null; EntryFormatter formatter; int pageSize; @@ -958,6 +961,7 @@ public class BookieShell implements Tool { ReadJournalCmd() { super(CMD_READJOURNAL); + rjOpts.addOption("dir", false, "Journal directory (needed if more than one journal configured)"); rjOpts.addOption("m", "msg", false, "Print message body"); } @@ -974,6 +978,32 @@ public class BookieShell implements Tool { if (cmdLine.hasOption("m")) { printMsg = true; } + + Journal journal = null; + if (getJournals().size() > 1) { + if (!cmdLine.hasOption("dir")) { + System.err.println("ERROR: invalid or missing journal directory"); + printUsage(); + return -1; + } + + File journalDirectory = new File(cmdLine.getOptionValue("dir")); + for (Journal j : getJournals()) { + if (j.getJournalDirectory().equals(journalDirectory)) { + journal = j; + break; + } + } + + if (journal == null) { + System.err.println("ERROR: journal directory not found"); + printUsage(); + return -1; + } + } else { + journal = getJournals().get(0); + } + long journalId; try { journalId = Long.parseLong(leftArgs[0]); @@ -991,7 +1021,7 @@ public class BookieShell implements Tool { journalId = Long.parseLong(idString, 16); } // scan journal - scanJournal(journalId, printMsg); + scanJournal(journal, journalId, printMsg); return 0; } @@ -1002,7 +1032,7 @@ public class BookieShell implements Tool { @Override String getUsage() { - return "readjournal [-msg] <journal_id | journal_file_name>"; + return "readjournal [-dir] [-msg] <journal_id | journal_file_name>"; } @Override @@ -1134,8 +1164,8 @@ public class BookieShell implements Tool { } if (all || journal) { - File journalDir = bkConf.getJournalDir(); - List<File> journalFiles = listFilesAndSort(new File[] { journalDir }, "txn"); + File[] journalDirs = bkConf.getJournalDirs(); + List<File> journalFiles = listFilesAndSort(journalDirs, "txn"); System.out.println("--------- Printing the list of Journal Files ---------"); for (File journalFile : journalFiles) { System.out.println(journalFile.getName()); @@ -1418,7 +1448,7 @@ public class BookieShell implements Tool { return -1; } Cookie newCookie = Cookie.newBuilder(oldCookie.getValue()).setBookieHost(newBookieId).build(); - boolean hasCookieUpdatedInDirs = verifyCookie(newCookie, journalDirectory); + boolean hasCookieUpdatedInDirs = verifyCookie(newCookie, journalDirectories[0]); for (File dir : ledgerDirectories) { hasCookieUpdatedInDirs &= verifyCookie(newCookie, dir); } @@ -1441,8 +1471,10 @@ public class BookieShell implements Tool { } } else { // writes newcookie to local dirs - newCookie.writeToDirectory(journalDirectory); - LOG.info("Updated cookie file present in journalDirectory {}", journalDirectory); + for (File journalDirectory : journalDirectories) { + newCookie.writeToDirectory(journalDirectory); + LOG.info("Updated cookie file present in journalDirectory {}", journalDirectory); + } for (File dir : ledgerDirectories) { newCookie.writeToDirectory(dir); } @@ -1533,7 +1565,7 @@ public class BookieShell implements Tool { try { Bookie.checkEnvironmentWithStorageExpansion(conf, zk, - journalDirectory, allLedgerDirs); + Lists.newArrayList(journalDirectories), allLedgerDirs); } catch (BookieException | IOException e) { LOG.error( "Exception while updating cookie for storage expansion", e); @@ -1756,7 +1788,7 @@ public class BookieShell implements Tool { @Override public void setConf(Configuration conf) throws Exception { bkConf.loadConf(conf); - journalDirectory = Bookie.getCurrentDirectory(bkConf.getJournalDir()); + journalDirectories = Bookie.getCurrentDirectories(bkConf.getJournalDirs()); ledgerDirectories = Bookie.getCurrentDirectories(bkConf.getLedgerDirs()); if (null == bkConf.getIndexDirs()) { indexDirectories = ledgerDirectories; @@ -1942,11 +1974,14 @@ public class BookieShell implements Tool { entryLogger.scanEntryLog(logId, scanner); } - private synchronized Journal getJournal() throws IOException { - if (null == journal) { - journal = new Journal(bkConf, new LedgerDirsManager(bkConf, bkConf.getLedgerDirs())); + private synchronized List<Journal> getJournals() throws IOException { + if (null == journals) { + journals = Lists.newArrayListWithCapacity(bkConf.getJournalDirs().length); + for (File journalDir : bkConf.getJournalDirs()) { + journals.add(new Journal(journalDir, bkConf, new LedgerDirsManager(bkConf, bkConf.getLedgerDirs()))); + } } - return journal; + return journals; } /** @@ -1957,8 +1992,8 @@ public class BookieShell implements Tool { * @param scanner * Journal File Scanner */ - protected void scanJournal(long journalId, JournalScanner scanner) throws IOException { - getJournal().scanJournal(journalId, 0L, scanner); + protected void scanJournal(Journal journal, long journalId, JournalScanner scanner) throws IOException { + journal.scanJournal(journalId, 0L, scanner); } /// @@ -2166,9 +2201,9 @@ public class BookieShell implements Tool { * @param printMsg * Whether printing the entry data. */ - protected void scanJournal(long journalId, final boolean printMsg) throws Exception { + protected void scanJournal(Journal journal, long journalId, final boolean printMsg) throws Exception { System.out.println("Scan journal " + journalId + " (" + Long.toHexString(journalId) + ".txn)"); - scanJournal(journalId, new JournalScanner() { + scanJournal(journal, journalId, new JournalScanner() { boolean printJournalVersion = false; @Override public void process(int journalVersion, long offset, ByteBuffer entry) throws IOException { @@ -2185,10 +2220,12 @@ public class BookieShell implements Tool { * Print last log mark */ protected void printLastLogMark() throws IOException { - LogMark lastLogMark = getJournal().getLastLogMark().getCurMark(); - System.out.println("LastLogMark: Journal Id - " + lastLogMark.getLogFileId() + "(" - + Long.toHexString(lastLogMark.getLogFileId()) + ".txn), Pos - " - + lastLogMark.getLogFileOffset()); + for (Journal journal : journals) { + LogMark lastLogMark = journal.getLastLogMark().getCurMark(); + System.out.println("LastLogMark: Journal Id - " + lastLogMark.getLogFileId() + "(" + + Long.toHexString(lastLogMark.getLogFileId()) + ".txn), Pos - " + + lastLogMark.getLogFileOffset()); + } } /** http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/123eccd4/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSourceList.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSourceList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSourceList.java new file mode 100644 index 0000000..3715ed2 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSourceList.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.bookie; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; +import java.util.List; + +import com.google.common.collect.Lists; + +public class CheckpointSourceList implements CheckpointSource { + + private final List<? extends CheckpointSource> checkpointSourcesList; + + public CheckpointSourceList(List<? extends CheckpointSource> checkpointSourcesList) { + this.checkpointSourcesList = checkpointSourcesList; + } + + @Override + public Checkpoint newCheckpoint() { + return new CheckpointList(this); + } + + @Override + public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException { + if (checkpoint == Checkpoint.MAX || checkpoint == Checkpoint.MIN) { + return; + } + + checkArgument(checkpoint instanceof CheckpointList); + CheckpointList checkpointList = (CheckpointList) checkpoint; + + checkArgument(checkpointList.source == this); + checkpointList.checkpointComplete(compact); + } + + private static class CheckpointList implements Checkpoint { + private final CheckpointSourceList source; + private final List<Checkpoint> checkpoints; + + public CheckpointList(CheckpointSourceList source) { + this.source = source; + this.checkpoints = Lists.newArrayListWithCapacity(source.checkpointSourcesList.size()); + for (CheckpointSource checkpointSource : source.checkpointSourcesList) { + checkpoints.add(checkpointSource.newCheckpoint()); + } + } + + private void checkpointComplete(boolean compact) throws IOException { + for (int i = 0; i < source.checkpointSourcesList.size(); i++) { + source.checkpointSourcesList.get(i).checkpointComplete(checkpoints.get(i), compact); + } + } + + @Override + public int compareTo(Checkpoint o) { + if (o == Checkpoint.MAX) { + return -1; + } else if (o == Checkpoint.MIN) { + return 1; + } + + checkArgument(o instanceof CheckpointList); + CheckpointList other = (CheckpointList) o; + if (checkpoints.size() != other.checkpoints.size()) { + return Integer.compare(checkpoints.size(), other.checkpoints.size()); + } + + for (int i = 0; i < checkpoints.size(); i++) { + int res = checkpoints.get(i).compareTo(other.checkpoints.get(i)); + if (res != 0) { + return res; + } + } + + return 0; + } + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/123eccd4/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java index 1730cd9..691dc1d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java @@ -36,6 +36,7 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Set; +import com.google.common.base.Joiner; import org.apache.bookkeeper.conf.AbstractConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.ZkVersion; @@ -74,15 +75,15 @@ class Cookie { static final int CURRENT_COOKIE_LAYOUT_VERSION = 4; private final int layoutVersion; private final String bookieHost; - private final String journalDir; + private final String journalDirs; private final String ledgerDirs; private final String instanceId; private static final String SEPARATOR = "\t"; - private Cookie(int layoutVersion, String bookieHost, String journalDir, String ledgerDirs, String instanceId) { + private Cookie(int layoutVersion, String bookieHost, String journalDirs, String ledgerDirs, String instanceId) { this.layoutVersion = layoutVersion; this.bookieHost = bookieHost; - this.journalDir = journalDir; + this.journalDirs = journalDirs; this.ledgerDirs = ledgerDirs; this.instanceId = instanceId; } @@ -136,7 +137,7 @@ class Cookie { LOG.error(errMsg); throw new BookieException.InvalidCookieException(errMsg); } else if (!(c.layoutVersion >= 3 && c.bookieHost.equals(bookieHost) - && c.journalDir.equals(journalDir) && verifyLedgerDirs(c, checkIfSuperSet))) { + && c.journalDirs.equals(journalDirs) && verifyLedgerDirs(c, checkIfSuperSet))) { errMsg = "Cookie [" + this + "] is not matching with [" + c + "]"; throw new BookieException.InvalidCookieException(errMsg); } else if ((instanceId == null && c.instanceId != null) @@ -162,7 +163,7 @@ class Cookie { } CookieFormat.Builder builder = CookieFormat.newBuilder(); builder.setBookieHost(bookieHost); - builder.setJournalDir(journalDir); + builder.setJournalDir(journalDirs); builder.setLedgerDirs(ledgerDirs); if (null != instanceId) { builder.setInstanceId(instanceId); @@ -177,7 +178,7 @@ class Cookie { StringBuilder b = new StringBuilder(); b.append(CURRENT_COOKIE_LAYOUT_VERSION).append("\n") .append(bookieHost).append("\n") - .append(journalDir).append("\n") + .append(journalDirs).append("\n") .append(ledgerDirs).append("\n"); return b.toString(); } @@ -198,14 +199,14 @@ class Cookie { } if (layoutVersion == 3) { cBuilder.setBookieHost(reader.readLine()); - cBuilder.setJournalDir(reader.readLine()); + cBuilder.setJournalDirs(reader.readLine()); cBuilder.setLedgerDirs(reader.readLine()); } else if (layoutVersion >= 4) { CookieFormat.Builder cfBuilder = CookieFormat.newBuilder(); TextFormat.merge(reader, cfBuilder); CookieFormat data = cfBuilder.build(); cBuilder.setBookieHost(data.getBookieHost()); - cBuilder.setJournalDir(data.getJournalDir()); + cBuilder.setJournalDirs(data.getJournalDir()); cBuilder.setLedgerDirs(data.getLedgerDirs()); // Since InstanceId is optional if (null != data.getInstanceId() && !data.getInstanceId().isEmpty()) { @@ -330,7 +331,7 @@ class Cookie { Builder builder = Cookie.newBuilder(); builder.setLayoutVersion(CURRENT_COOKIE_LAYOUT_VERSION); builder.setBookieHost(Bookie.getBookieAddress(conf).toString()); - builder.setJournalDir(conf.getJournalDirName()); + builder.setJournalDirs(Joiner.on(',').join(conf.getJournalDirNames())); builder.setLedgerDirs(encodeDirPaths(conf.getLedgerDirNames())); return builder; } @@ -466,17 +467,17 @@ class Cookie { public static class Builder { private int layoutVersion = 0; private String bookieHost = null; - private String journalDir = null; + private String journalDirs = null; private String ledgerDirs = null; private String instanceId = null; private Builder() { } - private Builder(int layoutVersion, String bookieHost, String journalDir, String ledgerDirs, String instanceId) { + private Builder(int layoutVersion, String bookieHost, String journalDirs, String ledgerDirs, String instanceId) { this.layoutVersion = layoutVersion; this.bookieHost = bookieHost; - this.journalDir = journalDir; + this.journalDirs = journalDirs; this.ledgerDirs = ledgerDirs; this.instanceId = instanceId; } @@ -491,8 +492,8 @@ class Cookie { return this; } - public Builder setJournalDir(String journalDir) { - this.journalDir = journalDir; + public Builder setJournalDirs(String journalDirs) { + this.journalDirs = journalDirs; return this; } @@ -507,7 +508,7 @@ class Cookie { } public Cookie build() { - return new Cookie(layoutVersion, bookieHost, journalDir, ledgerDirs, instanceId); + return new Cookie(layoutVersion, bookieHost, journalDirs, ledgerDirs, instanceId); } } @@ -528,7 +529,7 @@ class Cookie { * @return cookie builder */ static Builder newBuilder(Cookie oldCookie) { - return new Builder(oldCookie.layoutVersion, oldCookie.bookieHost, oldCookie.journalDir, oldCookie.ledgerDirs, + return new Builder(oldCookie.layoutVersion, oldCookie.bookieHost, oldCookie.journalDirs, oldCookie.ledgerDirs, oldCookie.instanceId); } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/123eccd4/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java index 177270e..2b8d325 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.bookie; +import com.google.common.collect.Lists; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.bookkeeper.util.HardLink; @@ -98,7 +99,7 @@ public class FileSystemUpgrade { private static List<File> getAllDirectories(ServerConfiguration conf) { List<File> dirs = new ArrayList<File>(); - dirs.add(conf.getJournalDir()); + dirs.addAll(Lists.newArrayList(conf.getJournalDirs())); for (File d: conf.getLedgerDirs()) { dirs.add(d); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/123eccd4/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 7be0984..1483e36 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -535,15 +535,15 @@ class Journal extends BookieCriticalThread implements CheckpointSource { private final Counter flushEmptyQueueCounter; private final Counter journalWriteBytes; - public Journal(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager) { - this(conf, ledgerDirsManager, NullStatsLogger.INSTANCE); + public Journal(File journalDirectory, ServerConfiguration conf, LedgerDirsManager ledgerDirsManager) { + this(journalDirectory, conf, ledgerDirsManager, NullStatsLogger.INSTANCE); } - public Journal(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) { + public Journal(File journalDirectory, ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) { super("BookieJournal-" + conf.getBookiePort()); this.ledgerDirsManager = ledgerDirsManager; this.conf = conf; - this.journalDirectory = Bookie.getCurrentDirectory(conf.getJournalDir()); + this.journalDirectory = journalDirectory; this.maxJournalSize = conf.getMaxJournalSizeMB() * MB; this.journalPreAllocSize = conf.getJournalPreAllocSizeMB() * MB; this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB; @@ -582,6 +582,10 @@ class Journal extends BookieCriticalThread implements CheckpointSource { journalWriteBytes = statsLogger.getCounter(JOURNAL_WRITE_BYTES); } + public File getJournalDirectory() { + return journalDirectory; + } + LastLogMark getLastLogMark() { return lastLogMark; } @@ -783,6 +787,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource { */ @Override public void run() { + LOG.info("Starting journal on {}", journalDirectory); LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>(); ByteBuffer lenBuff = ByteBuffer.allocate(4); ByteBuffer paddingBuff = ByteBuffer.allocate(2 * conf.getJournalAlignmentSize()); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/123eccd4/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 8ce6908..b590e38 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.annotations.Beta; +import com.google.common.collect.Lists; import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.bookkeeper.stats.StatsProvider; import org.apache.bookkeeper.util.BookKeeperConstants; @@ -83,6 +84,7 @@ public class ServerConfiguration extends AbstractConfiguration { protected final static String ALLOW_LOOPBACK = "allowLoopback"; protected final static String JOURNAL_DIR = "journalDirectory"; + protected final static String JOURNAL_DIRS = "journalDirectories"; protected final static String LEDGER_DIRS = "ledgerDirectories"; protected final static String INDEX_DIRS = "indexDirectories"; protected final static String ALLOW_STORAGE_EXPANSION = "allowStorageExpansion"; @@ -572,10 +574,24 @@ public class ServerConfiguration extends AbstractConfiguration { } /** + * Get dir names to store journal files + * + * @return journal dir name + */ + public String[] getJournalDirNames() { + String[] journalDirs = this.getStringArray(JOURNAL_DIRS); + if (journalDirs == null || journalDirs.length == 0) { + return new String[] {getJournalDirName()}; + } + return journalDirs; + } + + /** * Get dir name to store journal files * * @return journal dir name */ + @Deprecated public String getJournalDirName() { return this.getString(JOURNAL_DIR, "/tmp/bk-txn"); } @@ -588,21 +604,37 @@ public class ServerConfiguration extends AbstractConfiguration { * @return server configuration */ public ServerConfiguration setJournalDirName(String journalDir) { - this.setProperty(JOURNAL_DIR, journalDir); + this.setProperty(JOURNAL_DIRS, new String[] {journalDir}); return this; } /** - * Get dir to store journal files + * Set dir names to store journal files * - * @return journal dir, if no journal dir provided return null + * @param journalDirs + * Dir to store journal files + * @return server configuration + */ + public ServerConfiguration setJournalDirsName(String[] journalDirs) { + this.setProperty(JOURNAL_DIRS, journalDirs); + return this; + } + + /** + * Get dirs to store journal files + * + * @return journal dirs, if no journal dir provided return null */ - public File getJournalDir() { - String journalDirName = getJournalDirName(); - if (null == journalDirName) { + public File[] getJournalDirs() { + String[] journalDirNames = getJournalDirNames(); + if (null == journalDirNames) { return null; } - return new File(journalDirName); + File[] journalDirs = new File[journalDirNames.length]; + for(int i=0 ;i<journalDirNames.length; i++) { + journalDirs[i] = new File(journalDirNames[i]); + } + return journalDirs; } /** http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/123eccd4/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java index 14350e6..b38c22a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java @@ -420,7 +420,7 @@ public class BookieServer { String hello = String.format( "Hello, I'm your bookie, listening on port %1$s. ZKServers are on %2$s. Journals are in %3$s. Ledgers are stored in %4$s.", conf.getBookiePort(), conf.getZkServers(), - conf.getJournalDirName(), sb); + conf.getJournalDirNames(), sb); LOG.info(hello); try { // Initialize Stats Provider http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/123eccd4/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java index 2e0f18f..f49f8ae 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java @@ -32,8 +32,9 @@ public class BookieAccessor { * Force a bookie to flush its ledger storage */ public static void forceFlush(Bookie b) throws IOException { - Checkpoint cp = b.journal.newCheckpoint(); + CheckpointSourceList source = new CheckpointSourceList(b.journals); + Checkpoint cp = source.newCheckpoint(); b.ledgerStorage.flush(); - b.journal.checkpointComplete(cp, true); + source.checkpointComplete(cp, true); } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/123eccd4/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java index 5d384ba..ed881f1 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java @@ -219,7 +219,10 @@ public class CompactionTest extends BookKeeperClusterTestCase { // Do nothing. } }; - Bookie.checkDirectoryStructure(conf.getJournalDir()); + + for (File journalDir : conf.getJournalDirs()) { + Bookie.checkDirectoryStructure(journalDir); + } for (File dir : dirManager.getAllLedgerDirs()) { Bookie.checkDirectoryStructure(dir); } @@ -602,7 +605,9 @@ public class CompactionTest extends BookKeeperClusterTestCase { // Do nothing. } }; - Bookie.checkDirectoryStructure(conf.getJournalDir()); + for (File journalDir : conf.getJournalDirs()) { + Bookie.checkDirectoryStructure(journalDir); + } for (File dir : dirManager.getAllLedgerDirs()) { Bookie.checkDirectoryStructure(dir); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/123eccd4/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java index f3a44a3..f5b646e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java @@ -192,9 +192,11 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase { Assert.assertEquals("Wrongly updated cookie!", useHostNameAsBookieID, !cookie.isBookieHostCreatedFromIp()); verifyCookieInZooKeeper(newconf, 1); - File journalDir = Bookie.getCurrentDirectory(conf.getJournalDir()); - Cookie jCookie = Cookie.readFromDirectory(journalDir); - jCookie.verify(cookie); + for (File journalDir : conf.getJournalDirs()) { + journalDir = Bookie.getCurrentDirectory(journalDir); + Cookie jCookie = Cookie.readFromDirectory(journalDir); + jCookie.verify(cookie); + } File[] ledgerDir = Bookie.getCurrentDirectories(conf.getLedgerDirs()); for (File dir : ledgerDir) { Cookie lCookie = Cookie.readFromDirectory(dir);
