This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new adc906f Fixes for multiple journals recovery
adc906f is described below
commit adc906f60ff74cce33b150f53a3e2a203c0920d4
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Mar 16 09:41:14 2018 -0700
Fixes for multiple journals recovery
Currently, the implementation of multiple journals in bookie is broken
because all journal instances are using the same `logMark` file to store the
journal marker, stepping into each others.
Adding here a suffix for `logMark` to have unique files: `logMark.0`,
`logMark.1` etc.
If there's 1 single journal, we'll keep using `logMark` so there's no
compatibility issue.
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie
Guo <[email protected]>
This closes #1254 from merlimat/fix-multiple-journals and squashes the
following commits:
41c01af63 [Matteo Merli] Fixed LastMarkCommandTest
15ea26806 [Matteo Merli] Merge remote-tracking branch 'apache/master' into
fix-multiple-journals
48346b856 [Matteo Merli] Fixes for multiple journals recovery
---
.../java/org/apache/bookkeeper/bookie/Bookie.java | 2 +-
.../org/apache/bookkeeper/bookie/BookieShell.java | 3 +-
.../bookkeeper/bookie/FileSystemUpgrade.java | 2 +-
.../java/org/apache/bookkeeper/bookie/Journal.java | 22 +++--
.../server/http/service/GetLastLogMarkService.java | 3 +-
.../tools/cli/commands/bookie/LastMarkCommand.java | 17 ++--
.../bookkeeper/bookie/BookieJournalForceTest.java | 8 +-
.../bookie/BookieMultipleJournalsTest.java | 104 +++++++++++++++++++++
.../cli/commands/bookie/LastMarkCommandTest.java | 5 +-
9 files changed, 141 insertions(+), 25 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 8e4157a..7322173 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -676,7 +676,7 @@ public class Bookie extends BookieCriticalThread {
// instantiate the journals
journals = Lists.newArrayList();
for (int i = 0; i < journalDirectories.size(); i++) {
- journals.add(new Journal(journalDirectories.get(i),
+ journals.add(new Journal(i, journalDirectories.get(i),
conf, ledgerDirsManager,
statsLogger.scope(JOURNAL_SCOPE)));
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 149bf24..546a8ba 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -2957,8 +2957,9 @@ public class BookieShell implements Tool {
private synchronized List<Journal> getJournals() throws IOException {
if (null == journals) {
journals =
Lists.newArrayListWithCapacity(bkConf.getJournalDirs().length);
+ int idx = 0;
for (File journalDir : bkConf.getJournalDirs()) {
- journals.add(new Journal(new File(journalDir,
BookKeeperConstants.CURRENT_DIR), bkConf,
+ journals.add(new Journal(idx++, new File(journalDir,
BookKeeperConstants.CURRENT_DIR), bkConf,
new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
new DiskChecker(bkConf.getDiskUsageThreshold(),
bkConf.getDiskUsageWarnThreshold()))));
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
index 311988b..7600acc 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
@@ -88,7 +88,7 @@ public class FileSystemUpgrade {
public boolean accept(File dir, String name) {
if (name.endsWith(".txn") || name.endsWith(".log")
- || name.equals("lastId") || name.equals("lastMark")) {
+ || name.equals("lastId") || name.startsWith("lastMark")) {
return true;
}
if (containsIndexFiles(dir, name)) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 9e9d055..6173936 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -183,7 +183,7 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
List<File> writableLedgerDirs = ledgerDirsManager
.getWritableLedgerDirs();
for (File dir : writableLedgerDirs) {
- File file = new File(dir, "lastMark");
+ File file = new File(dir, lastMarkFileName);
FileOutputStream fos = null;
try {
fos = new FileOutputStream(file);
@@ -212,7 +212,7 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
ByteBuffer bb = ByteBuffer.wrap(buff);
LogMark mark = new LogMark();
for (File dir: ledgerDirsManager.getAllLedgerDirs()) {
- File file = new File(dir, "lastMark");
+ File file = new File(dir, lastMarkFileName);
try {
try (FileInputStream fis = new FileInputStream(file)) {
int bytesRead = fis.read(buff);
@@ -577,6 +577,10 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
private final LastLogMark lastLogMark = new LastLogMark(0, 0);
+ private static final String LAST_MARK_DEFAULT_NAME = "lastMark";
+
+ private final String lastMarkFileName;
+
/**
* The thread pool used to handle callback.
*/
@@ -607,12 +611,13 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
private final Counter flushEmptyQueueCounter;
private final Counter journalWriteBytes;
- public Journal(File journalDirectory, ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager) {
- this(journalDirectory, conf, ledgerDirsManager,
NullStatsLogger.INSTANCE);
+ public Journal(int journalIndex, File journalDirectory,
ServerConfiguration conf,
+ LedgerDirsManager ledgerDirsManager) {
+ this(journalIndex, journalDirectory, conf, ledgerDirsManager,
NullStatsLogger.INSTANCE);
}
- public Journal(File journalDirectory, ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager,
- StatsLogger statsLogger) {
+ public Journal(int journalIndex, File journalDirectory,
ServerConfiguration conf,
+ LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) {
super("BookieJournal-" + conf.getBookiePort());
this.ledgerDirsManager = ledgerDirsManager;
this.conf = conf;
@@ -639,6 +644,11 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
this.removePagesFromCache = conf.getJournalRemovePagesFromCache();
// read last log mark
+ if (conf.getJournalDirs().length == 1) {
+ lastMarkFileName = LAST_MARK_DEFAULT_NAME;
+ } else {
+ lastMarkFileName = LAST_MARK_DEFAULT_NAME + "." + journalIndex;
+ }
lastLogMark.readLog();
if (LOG.isDebugEnabled()) {
LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLastLogMarkService.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLastLogMarkService.java
index 31268cd..4cbd1dc 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLastLogMarkService.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLastLogMarkService.java
@@ -77,8 +77,9 @@ public class GetLastLogMarkService implements
HttpEndpointService {
Map<String, String> output = Maps.newHashMap();
List<Journal> journals =
Lists.newArrayListWithCapacity(conf.getJournalDirs().length);
+ int idx = 0;
for (File journalDir : conf.getJournalDirs()) {
- journals.add(new Journal(journalDir, conf, new
LedgerDirsManager(conf, conf.getLedgerDirs(),
+ journals.add(new Journal(idx++, journalDir, conf, new
LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()))));
}
for (Journal journal : journals) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommand.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommand.java
index a83f195..f31885c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommand.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommand.java
@@ -19,8 +19,9 @@
package org.apache.bookkeeper.tools.cli.commands.bookie;
import com.beust.jcommander.Parameters;
-import com.google.common.collect.Lists;
-import java.util.List;
+
+import java.io.File;
+
import org.apache.bookkeeper.bookie.Journal;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LogMark;
@@ -44,14 +45,10 @@ public class LastMarkCommand extends BookieCommand {
LedgerDirsManager dirsManager = new LedgerDirsManager(
conf, conf.getJournalDirs(),
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
- List<Journal> journals = Lists.transform(
- Lists.newArrayList(conf.getJournalDirs()),
- dir -> new Journal(
- dir,
- conf,
- dirsManager)
- );
- for (Journal journal : journals) {
+ File[] journalDirs = conf.getJournalDirs();
+
+ for (int idx = 0; idx < journalDirs.length; idx++) {
+ Journal journal = new Journal(idx, journalDirs[idx], conf,
dirsManager);
LogMark lastLogMark = journal.getLastLogMark().getCurMark();
System.out.println("LastLogMark : Journal Id - " +
lastLogMark.getLogFileId() + "("
+ Long.toHexString(lastLogMark.getLogFileId()) + ".txn), Pos -
"
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
index a55bef7..0b022fe 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
@@ -84,7 +84,7 @@ public class BookieJournalForceTest {
whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
- Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+ Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
// machinery to suspend ForceWriteThread
CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
@@ -146,7 +146,7 @@ public class BookieJournalForceTest {
whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
- Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+ Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
// machinery to suspend ForceWriteThread
CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
@@ -201,7 +201,7 @@ public class BookieJournalForceTest {
whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
- Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+ Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
// machinery to suspend ForceWriteThread
CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
@@ -260,7 +260,7 @@ public class BookieJournalForceTest {
whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
- Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+ Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
journal.start();
final int numEntries = 100;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
new file mode 100644
index 0000000..6c0682b
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+/**
+ * Test the bookie with multiple journals.
+ */
+public class BookieMultipleJournalsTest extends BookKeeperClusterTestCase {
+
+ public BookieMultipleJournalsTest() {
+ super(1);
+ }
+
+ protected ServerConfiguration newServerConfiguration(int port, String
zkServers, File journalDir,
+ File[] ledgerDirs) {
+ ServerConfiguration conf = super.newServerConfiguration(port,
zkServers, journalDir, ledgerDirs);
+
+ // Use 4 journals
+ String[] journalDirs = new String[4];
+ for (int i = 0; i < 4; i++) {
+ journalDirs[i] = journalDir.getAbsolutePath() + "/journal-" + i;
+ }
+ conf.setJournalDirsName(journalDirs);
+
+ return conf;
+ }
+
+ @Test
+ public void testMultipleWritesAndBookieRestart() throws Exception {
+ // Creates few ledgers so that writes are spread across all journals
+ final int numLedgers = 16;
+ final int numEntriesPerLedger = 30;
+ List<LedgerHandle> writeHandles = new ArrayList<>();
+
+ for (int i = 0; i < numLedgers; i++) {
+ writeHandles.add(bkc.createLedger(1, 1, DigestType.CRC32, new
byte[0]));
+ }
+
+ for (int i = 0; i < numEntriesPerLedger; i++) {
+ for (int j = 0; j < numLedgers; j++) {
+ writeHandles.get(j).addEntry(("entry-" + i).getBytes());
+ }
+ }
+
+ restartBookies();
+
+ // Write another set of entries
+ for (int i = numEntriesPerLedger; i < 2 * numEntriesPerLedger; i++) {
+ for (int j = 0; j < numLedgers; j++) {
+ writeHandles.get(j).addEntry(("entry-" + i).getBytes());
+ }
+ }
+
+ restartBookies();
+
+ List<LedgerHandle> readHandles = new ArrayList<>();
+
+ for (int i = 0; i < numLedgers; i++) {
+ readHandles.add(bkc.openLedger(writeHandles.get(i).getId(),
DigestType.CRC32, new byte[0]));
+ }
+
+ for (int i = 0; i < numLedgers; i++) {
+ Enumeration<LedgerEntry> entries =
readHandles.get(i).readEntries(0, numEntriesPerLedger - 1);
+
+ for (int j = 0; j < numEntriesPerLedger; j++) {
+ LedgerEntry entry = entries.nextElement();
+ assertEquals("entry-" + j, new String(entry.getEntry()));
+ }
+ }
+ }
+
+}
diff --git
a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommandTest.java
b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommandTest.java
index c5c366a..20899b7 100644
---
a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommandTest.java
+++
b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommandTest.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.tools.cli.commands.bookie;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -77,10 +78,12 @@ public class LastMarkCommandTest extends
BookieCommandTestBase {
when(journal.getLastLogMark()).thenReturn(lastLogMark);
PowerMockito.whenNew(Journal.class)
.withParameterTypes(
+ int.class,
File.class,
ServerConfiguration.class,
LedgerDirsManager.class)
.withArguments(
+ any(int.class),
any(File.class),
eq(conf),
any(LedgerDirsManager.class))
@@ -96,7 +99,7 @@ public class LastMarkCommandTest extends
BookieCommandTestBase {
PowerMockito.verifyNew(LedgerDirsManager.class, times(1))
.withArguments(eq(conf), any(File[].class),
any(DiskChecker.class));
PowerMockito.verifyNew(Journal.class, times(3))
- .withArguments(any(File.class), eq(conf),
any(LedgerDirsManager.class));
+ .withArguments(any(int.class), any(File.class), eq(conf),
any(LedgerDirsManager.class));
verify(journal, times(3)).getLastLogMark();
verify(lastLogMark, times(3)).getCurMark();
verify(logMark, times(3 * 2)).getLogFileId();
--
To stop receiving notification emails like this one, please contact
[email protected].