This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new adc906f  Fixes for multiple journals recovery
adc906f is described below

commit adc906f60ff74cce33b150f53a3e2a203c0920d4
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Mar 16 09:41:14 2018 -0700

    Fixes for multiple journals recovery
    
    Currently, the implementation of multiple journals in bookie is broken 
because all journal instances are using the same `logMark` file to store the 
journal marker, stepping into each others.
    
    Adding here a suffix for `logMark` to have unique files: `logMark.0`, 
`logMark.1` etc.
    If there's 1 single journal, we'll keep using `logMark` so there's no 
compatibility issue.
    
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie 
Guo <[email protected]>
    
    This closes #1254 from merlimat/fix-multiple-journals and squashes the 
following commits:
    
    41c01af63 [Matteo Merli] Fixed LastMarkCommandTest
    15ea26806 [Matteo Merli] Merge remote-tracking branch 'apache/master' into 
fix-multiple-journals
    48346b856 [Matteo Merli] Fixes for multiple journals recovery
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |   2 +-
 .../org/apache/bookkeeper/bookie/BookieShell.java  |   3 +-
 .../bookkeeper/bookie/FileSystemUpgrade.java       |   2 +-
 .../java/org/apache/bookkeeper/bookie/Journal.java |  22 +++--
 .../server/http/service/GetLastLogMarkService.java |   3 +-
 .../tools/cli/commands/bookie/LastMarkCommand.java |  17 ++--
 .../bookkeeper/bookie/BookieJournalForceTest.java  |   8 +-
 .../bookie/BookieMultipleJournalsTest.java         | 104 +++++++++++++++++++++
 .../cli/commands/bookie/LastMarkCommandTest.java   |   5 +-
 9 files changed, 141 insertions(+), 25 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 8e4157a..7322173 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -676,7 +676,7 @@ public class Bookie extends BookieCriticalThread {
         // instantiate the journals
         journals = Lists.newArrayList();
         for (int i = 0; i < journalDirectories.size(); i++) {
-            journals.add(new Journal(journalDirectories.get(i),
+            journals.add(new Journal(i, journalDirectories.get(i),
                          conf, ledgerDirsManager, 
statsLogger.scope(JOURNAL_SCOPE)));
         }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 149bf24..546a8ba 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -2957,8 +2957,9 @@ public class BookieShell implements Tool {
     private synchronized List<Journal> getJournals() throws IOException {
         if (null == journals) {
             journals = 
Lists.newArrayListWithCapacity(bkConf.getJournalDirs().length);
+            int idx = 0;
             for (File journalDir : bkConf.getJournalDirs()) {
-                journals.add(new Journal(new File(journalDir, 
BookKeeperConstants.CURRENT_DIR), bkConf,
+                journals.add(new Journal(idx++, new File(journalDir, 
BookKeeperConstants.CURRENT_DIR), bkConf,
                     new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
                         new DiskChecker(bkConf.getDiskUsageThreshold(), 
bkConf.getDiskUsageWarnThreshold()))));
             }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
index 311988b..7600acc 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
@@ -88,7 +88,7 @@ public class FileSystemUpgrade {
 
             public boolean accept(File dir, String name) {
                 if (name.endsWith(".txn") || name.endsWith(".log")
-                    || name.equals("lastId") || name.equals("lastMark")) {
+                    || name.equals("lastId") || name.startsWith("lastMark")) {
                     return true;
                 }
                 if (containsIndexFiles(dir, name)) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 9e9d055..6173936 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -183,7 +183,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
             List<File> writableLedgerDirs = ledgerDirsManager
                     .getWritableLedgerDirs();
             for (File dir : writableLedgerDirs) {
-                File file = new File(dir, "lastMark");
+                File file = new File(dir, lastMarkFileName);
                 FileOutputStream fos = null;
                 try {
                     fos = new FileOutputStream(file);
@@ -212,7 +212,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
             ByteBuffer bb = ByteBuffer.wrap(buff);
             LogMark mark = new LogMark();
             for (File dir: ledgerDirsManager.getAllLedgerDirs()) {
-                File file = new File(dir, "lastMark");
+                File file = new File(dir, lastMarkFileName);
                 try {
                     try (FileInputStream fis = new FileInputStream(file)) {
                         int bytesRead = fis.read(buff);
@@ -577,6 +577,10 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
 
     private final LastLogMark lastLogMark = new LastLogMark(0, 0);
 
+    private static final String LAST_MARK_DEFAULT_NAME = "lastMark";
+
+    private final String lastMarkFileName;
+
     /**
      * The thread pool used to handle callback.
      */
@@ -607,12 +611,13 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
     private final Counter flushEmptyQueueCounter;
     private final Counter journalWriteBytes;
 
-    public Journal(File journalDirectory, ServerConfiguration conf, 
LedgerDirsManager ledgerDirsManager) {
-        this(journalDirectory, conf, ledgerDirsManager, 
NullStatsLogger.INSTANCE);
+    public Journal(int journalIndex, File journalDirectory, 
ServerConfiguration conf,
+            LedgerDirsManager ledgerDirsManager) {
+        this(journalIndex, journalDirectory, conf, ledgerDirsManager, 
NullStatsLogger.INSTANCE);
     }
 
-    public Journal(File journalDirectory, ServerConfiguration conf, 
LedgerDirsManager ledgerDirsManager,
-                   StatsLogger statsLogger) {
+    public Journal(int journalIndex, File journalDirectory, 
ServerConfiguration conf,
+            LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) {
         super("BookieJournal-" + conf.getBookiePort());
         this.ledgerDirsManager = ledgerDirsManager;
         this.conf = conf;
@@ -639,6 +644,11 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
 
         this.removePagesFromCache = conf.getJournalRemovePagesFromCache();
         // read last log mark
+        if (conf.getJournalDirs().length == 1) {
+            lastMarkFileName = LAST_MARK_DEFAULT_NAME;
+        } else {
+            lastMarkFileName = LAST_MARK_DEFAULT_NAME + "." + journalIndex;
+        }
         lastLogMark.readLog();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLastLogMarkService.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLastLogMarkService.java
index 31268cd..4cbd1dc 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLastLogMarkService.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLastLogMarkService.java
@@ -77,8 +77,9 @@ public class GetLastLogMarkService implements 
HttpEndpointService {
                 Map<String, String> output = Maps.newHashMap();
 
                 List<Journal> journals = 
Lists.newArrayListWithCapacity(conf.getJournalDirs().length);
+                int idx = 0;
                 for (File journalDir : conf.getJournalDirs()) {
-                    journals.add(new Journal(journalDir, conf, new 
LedgerDirsManager(conf, conf.getLedgerDirs(),
+                    journals.add(new Journal(idx++, journalDir, conf, new 
LedgerDirsManager(conf, conf.getLedgerDirs(),
                       new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()))));
                 }
                 for (Journal journal : journals) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommand.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommand.java
index a83f195..f31885c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommand.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommand.java
@@ -19,8 +19,9 @@
 package org.apache.bookkeeper.tools.cli.commands.bookie;
 
 import com.beust.jcommander.Parameters;
-import com.google.common.collect.Lists;
-import java.util.List;
+
+import java.io.File;
+
 import org.apache.bookkeeper.bookie.Journal;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.LogMark;
@@ -44,14 +45,10 @@ public class LastMarkCommand extends BookieCommand {
         LedgerDirsManager dirsManager = new LedgerDirsManager(
             conf, conf.getJournalDirs(),
             new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
-        List<Journal> journals = Lists.transform(
-            Lists.newArrayList(conf.getJournalDirs()),
-            dir -> new Journal(
-                dir,
-                conf,
-                dirsManager)
-        );
-        for (Journal journal : journals) {
+        File[] journalDirs = conf.getJournalDirs();
+
+        for (int idx = 0; idx < journalDirs.length; idx++) {
+            Journal journal = new Journal(idx, journalDirs[idx], conf, 
dirsManager);
             LogMark lastLogMark = journal.getLastLogMark().getCurMark();
             System.out.println("LastLogMark : Journal Id - " + 
lastLogMark.getLogFileId() + "("
                 + Long.toHexString(lastLogMark.getLogFileId()) + ".txn), Pos - 
"
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
index a55bef7..0b022fe 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
@@ -84,7 +84,7 @@ public class BookieJournalForceTest {
         whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
 
         LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
-        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+        Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
 
         // machinery to suspend ForceWriteThread
         CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
@@ -146,7 +146,7 @@ public class BookieJournalForceTest {
         whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
 
         LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
-        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+        Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
 
         // machinery to suspend ForceWriteThread
         CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
@@ -201,7 +201,7 @@ public class BookieJournalForceTest {
         whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
 
         LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
-        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+        Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
 
         // machinery to suspend ForceWriteThread
         CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
@@ -260,7 +260,7 @@ public class BookieJournalForceTest {
         whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
 
         LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
-        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+        Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
         journal.start();
 
         final int numEntries = 100;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
new file mode 100644
index 0000000..6c0682b
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+/**
+ * Test the bookie with multiple journals.
+ */
+public class BookieMultipleJournalsTest extends BookKeeperClusterTestCase {
+
+    public BookieMultipleJournalsTest() {
+        super(1);
+    }
+
+    protected ServerConfiguration newServerConfiguration(int port, String 
zkServers, File journalDir,
+            File[] ledgerDirs) {
+        ServerConfiguration conf = super.newServerConfiguration(port, 
zkServers, journalDir, ledgerDirs);
+
+        // Use 4 journals
+        String[] journalDirs = new String[4];
+        for (int i = 0; i < 4; i++) {
+            journalDirs[i] = journalDir.getAbsolutePath() + "/journal-" + i;
+        }
+        conf.setJournalDirsName(journalDirs);
+
+        return conf;
+    }
+
+    @Test
+    public void testMultipleWritesAndBookieRestart() throws Exception {
+        // Creates few ledgers so that writes are spread across all journals
+        final int numLedgers = 16;
+        final int numEntriesPerLedger = 30;
+        List<LedgerHandle> writeHandles = new ArrayList<>();
+
+        for (int i = 0; i < numLedgers; i++) {
+            writeHandles.add(bkc.createLedger(1, 1, DigestType.CRC32, new 
byte[0]));
+        }
+
+        for (int i = 0; i < numEntriesPerLedger; i++) {
+            for (int j = 0; j < numLedgers; j++) {
+                writeHandles.get(j).addEntry(("entry-" + i).getBytes());
+            }
+        }
+
+        restartBookies();
+
+        // Write another set of entries
+        for (int i = numEntriesPerLedger; i < 2 * numEntriesPerLedger; i++) {
+            for (int j = 0; j < numLedgers; j++) {
+                writeHandles.get(j).addEntry(("entry-" + i).getBytes());
+            }
+        }
+
+        restartBookies();
+
+        List<LedgerHandle> readHandles = new ArrayList<>();
+
+        for (int i = 0; i < numLedgers; i++) {
+            readHandles.add(bkc.openLedger(writeHandles.get(i).getId(), 
DigestType.CRC32, new byte[0]));
+        }
+
+        for (int i = 0; i < numLedgers; i++) {
+            Enumeration<LedgerEntry> entries = 
readHandles.get(i).readEntries(0, numEntriesPerLedger - 1);
+
+            for (int j = 0; j < numEntriesPerLedger; j++) {
+                LedgerEntry entry = entries.nextElement();
+                assertEquals("entry-" + j, new String(entry.getEntry()));
+            }
+        }
+    }
+
+}
diff --git 
a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommandTest.java
 
b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommandTest.java
index c5c366a..20899b7 100644
--- 
a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommandTest.java
+++ 
b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/LastMarkCommandTest.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.tools.cli.commands.bookie;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -77,10 +78,12 @@ public class LastMarkCommandTest extends 
BookieCommandTestBase {
         when(journal.getLastLogMark()).thenReturn(lastLogMark);
         PowerMockito.whenNew(Journal.class)
             .withParameterTypes(
+                int.class,
                 File.class,
                 ServerConfiguration.class,
                 LedgerDirsManager.class)
             .withArguments(
+                any(int.class),
                 any(File.class),
                 eq(conf),
                 any(LedgerDirsManager.class))
@@ -96,7 +99,7 @@ public class LastMarkCommandTest extends 
BookieCommandTestBase {
         PowerMockito.verifyNew(LedgerDirsManager.class, times(1))
             .withArguments(eq(conf), any(File[].class), 
any(DiskChecker.class));
         PowerMockito.verifyNew(Journal.class, times(3))
-            .withArguments(any(File.class), eq(conf), 
any(LedgerDirsManager.class));
+            .withArguments(any(int.class), any(File.class), eq(conf), 
any(LedgerDirsManager.class));
         verify(journal, times(3)).getLastLogMark();
         verify(lastLogMark, times(3)).getCurMark();
         verify(logMark, times(3 * 2)).getLogFileId();

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to