This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.6 by this push:
     new 3adacf0  ISSUE #1255: Bookie should not advance the journal marker 
before creating the index file
3adacf0 is described below

commit 3adacf06f9ab8254b6ba6b45a6dcdd523e5030b8
Author: Sijie Guo <[email protected]>
AuthorDate: Wed Mar 14 11:03:05 2018 -0700

    ISSUE #1255: Bookie should not advance the journal marker before creating 
the index file
    
    Descriptions of the changes in this PR:
    
    *Problem*
    
    Currently Bookie journal 'new ledger' entry if a ledger doesn't exist at 
ledger storage. This 'new ledger' entry is journaled before adding the entry to 
ledger storage. so this would cause a problem on checkpointing.
    
    - journal 'new ledger' entry
    
    ```
        /**
         * Retrieve the ledger descriptor for the ledger which entry should be 
added to.
         * The LedgerDescriptor returned from this method should be eventually 
freed with
         * #putHandle().
         *
         * throws BookieException if masterKey does not match the master key of 
the ledger
         */
        private LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] 
masterKey)
                throws IOException, BookieException {
            final long ledgerId = entry.getLong(entry.readerIndex());
    
            LedgerDescriptor l = handles.getHandle(ledgerId, masterKey);
            if (masterKeyCache.get(ledgerId) == null) {
                // Force the load into masterKey cache
                byte[] oldValue = masterKeyCache.putIfAbsent(ledgerId, 
masterKey);
                if (oldValue == null) {
                    // new handle, we should add the key to journal ensure we 
can rebuild
                    ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + 
masterKey.length);
                    bb.putLong(ledgerId);
                    bb.putLong(METAENTRY_ID_LEDGER_KEY);
                    bb.putInt(masterKey.length);
                    bb.put(masterKey);
                    bb.flip();
    
                    getJournal(ledgerId).logAddEntry(bb, false /* ackBeforeSync 
*/, new NopWriteCallback(), null);
                }
            }
    
            return l;
        }
    ```
    
    - add entry to ledger storage
    ```
        /**
         * Add an entry to a ledger as specified by handle.
         */
        private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
                                      boolean ackBeforeSync, WriteCallback cb, 
Object ctx)
                throws IOException, BookieException {
            long ledgerId = handle.getLedgerId();
            long entryId = handle.addEntry(entry);
    
            writeBytes.add(entry.readableBytes());
    
            if (LOG.isTraceEnabled()) {
                LOG.trace("Adding {}{}", entryId, ledgerId);
            }
            getJournal(ledgerId).logAddEntry(entry, ackBeforeSync, cb, ctx);
        }
    
    ```
    
    The problematic sequence can be described as below:
    
    - thread t1 is adding the first entry of ledger L1
    - thread t2 is adding entries of ledger L2
    - t1 is adding a journal entry of 'new ledger L1'
    - t2 is adding entries after t1 adds the journal entry and before t1 adding 
the entry to ledger storage
    - after t2 added entries, checkpoint happens in the ledger storage. it 
would roll the journal mark, which will claim the 'new ledger L1' entry as 
persistent.
    - if the bookie restarts, it would fail with no such ledger exception.
    
    The problem can be produced using a unit test: 
https://github.com/sijie/bookkeeper/commit/5053a717cd578aeb88236d373553d7494501b801
    
    *Solution*
    
    The fix is simple - just make sure the 'new ledger' journal entry is added 
after an entry is added to ledger storage. so it make sure when checkpoint 
happen it will flush and create the ledger before moving the journal mark.
    
    Author: Sijie Guo <[email protected]>
    
    Reviewers: Ivan Kelly <[email protected]>, Charan Reddy Guttapalem 
<[email protected]>, Jia Zhai <None>, Yiming Zang <[email protected]>, 
Matteo Merli <[email protected]>
    
    This closes #1256 from sijie/fix_db_ledger_storage_checkpoint, closes #1255
    
    (cherry picked from commit 7e8980fff096a9111e9365d0dc959af0625a87db)
    Signed-off-by: Sijie Guo <[email protected]>
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |  45 ++---
 .../org/apache/bookkeeper/bookie/SyncThread.java   |   4 +
 .../bookie/CheckpointOnNewLedgersTest.java         | 194 +++++++++++++++++++++
 3 files changed, 222 insertions(+), 21 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 711b2b8..a65ce55 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -1230,11 +1230,31 @@ public class Bookie extends BookieCriticalThread {
      *
      * @throws BookieException if masterKey does not match the master key of 
the ledger
      */
-    private LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] 
masterKey)
+    @VisibleForTesting
+    LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey)
             throws IOException, BookieException {
         final long ledgerId = entry.getLong(entry.readerIndex());
 
-        LedgerDescriptor l = handles.getHandle(ledgerId, masterKey);
+        return handles.getHandle(ledgerId, masterKey);
+    }
+
+    private Journal getJournal(long ledgerId) {
+        return journals.get(MathUtils.signSafeMod(ledgerId, journals.size()));
+    }
+
+    /**
+     * Add an entry to a ledger as specified by handle.
+     */
+    private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
+                                  WriteCallback cb, Object ctx, byte[] 
masterKey)
+            throws IOException, BookieException {
+        long ledgerId = handle.getLedgerId();
+        long entryId = handle.addEntry(entry);
+
+        writeBytes.add(entry.readableBytes());
+
+        // journal `addEntry` should happen after the entry is added to ledger 
storage.
+        // otherwise the journal entry can potentially be rolled before the 
ledger is created in ledger storage.
         if (masterKeyCache.get(ledgerId) == null) {
             // Force the load into masterKey cache
             byte[] oldValue = masterKeyCache.putIfAbsent(ledgerId, masterKey);
@@ -1251,23 +1271,6 @@ public class Bookie extends BookieCriticalThread {
             }
         }
 
-        return l;
-    }
-
-    private Journal getJournal(long ledgerId) {
-        return journals.get(MathUtils.signSafeMod(ledgerId, journals.size()));
-    }
-
-    /**
-     * Add an entry to a ledger as specified by handle.
-     */
-    private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, 
WriteCallback cb, Object ctx)
-            throws IOException, BookieException {
-        long ledgerId = handle.getLedgerId();
-        long entryId = handle.addEntry(entry);
-
-        writeBytes.add(entry.readableBytes());
-
         if (LOG.isTraceEnabled()) {
             LOG.trace("Adding {}@{}", entryId, ledgerId);
         }
@@ -1289,7 +1292,7 @@ public class Bookie extends BookieCriticalThread {
             LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
             synchronized (handle) {
                 entrySize = entry.readableBytes();
-                addEntryInternal(handle, entry, cb, ctx);
+                addEntryInternal(handle, entry, cb, ctx, masterKey);
             }
             success = true;
         } catch (NoWritableLedgerDirException e) {
@@ -1349,7 +1352,7 @@ public class Bookie extends BookieCriticalThread {
                             
.create(BookieException.Code.LedgerFencedException);
                 }
                 entrySize = entry.readableBytes();
-                addEntryInternal(handle, entry, cb, ctx);
+                addEntryInternal(handle, entry, cb, ctx, masterKey);
             }
             success = true;
         } catch (NoWritableLedgerDirException e) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index 44bf6ae..0d3ab3c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -28,6 +28,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
+import lombok.AccessLevel;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
@@ -55,6 +58,7 @@ import org.apache.bookkeeper.util.MathUtils;
 @Slf4j
 class SyncThread implements Checkpointer {
 
+    @Getter(AccessLevel.PACKAGE)
     final ScheduledExecutorService executor;
     final int flushInterval;
     final LedgerStorage ledgerStorage;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CheckpointOnNewLedgersTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CheckpointOnNewLedgersTest.java
new file mode 100644
index 0000000..908cd59
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CheckpointOnNewLedgersTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.Journal.LastLogMark;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Test the checkpoint logic of ledger storage.
+ */
+@Slf4j
+public class CheckpointOnNewLedgersTest {
+
+    @Rule
+    public final TemporaryFolder testDir = new TemporaryFolder();
+
+    private ServerConfiguration conf;
+    private Bookie bookie;
+    private CountDownLatch getLedgerDescCalledLatch;
+    private CountDownLatch getLedgerDescWaitLatch;
+
+    @Before
+    public void setup() throws Exception {
+        File bkDir = testDir.newFolder("dbLedgerStorageCheckpointTest");
+        File curDir = Bookie.getCurrentDirectory(bkDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        conf = TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+        conf.setJournalDirsName(new String[] { bkDir.toString() });
+        conf.setLedgerDirNames(new String[] { bkDir.toString() });
+        conf.setEntryLogSizeLimit(10 * 1024);
+
+        bookie = spy(new Bookie(conf));
+        bookie.start();
+
+        getLedgerDescCalledLatch = new CountDownLatch(1);
+        getLedgerDescWaitLatch = new CountDownLatch(1);
+
+        // spy `getLedgerForEntry`
+        doAnswer(invocationOnMock -> {
+            ByteBuf entry = invocationOnMock.getArgument(0);
+            long ledgerId = entry.getLong(entry.readerIndex());
+
+            LedgerDescriptor ld = (LedgerDescriptor) 
invocationOnMock.callRealMethod();
+
+            if (ledgerId % 2 == 1) {
+                getLedgerDescCalledLatch.countDown();
+                getLedgerDescWaitLatch.await();
+            }
+
+            return ld;
+        }).when(bookie).getLedgerForEntry(
+            any(ByteBuf.class),
+            any(byte[].class));
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (null != bookie) {
+            bookie.shutdown();
+        }
+    }
+
+    private static ByteBuf createByteBuf(long ledgerId, long entryId, int 
entrySize) {
+        byte[] data = new byte[entrySize];
+        ThreadLocalRandom.current().nextBytes(data);
+        ByteBuf buffer = Unpooled.wrappedBuffer(data);
+        buffer.writerIndex(0);
+        buffer.writeLong(ledgerId);
+        buffer.writeLong(entryId);
+        buffer.writeLong(entryId - 1); // lac
+        buffer.writerIndex(entrySize);
+        return buffer;
+    }
+
+    @Test
+    public void testCheckpoint() throws Exception {
+        int entrySize = 1024;
+        long l1 = 1L;
+        long l2 = 2L;
+
+        final CountDownLatch writeL1Latch = new CountDownLatch(1);
+
+        Thread t1 = new Thread(() -> {
+
+            ByteBuf entry = createByteBuf(l1, 0L, entrySize);
+            try {
+                bookie.addEntry(
+                    entry,
+                    (rc, ledgerId, entryId, addr, ctx) -> 
writeL1Latch.countDown(),
+                    null,
+                    new byte[0]
+                );
+            } catch (Exception e) {
+                log.info("Failed to write entry to l1", e);
+            }
+
+        }, "ledger-1-writer");
+
+        t1.start();
+
+        // wait until the ledger desc is opened
+        getLedgerDescCalledLatch.await();
+
+        LastLogMark logMark = 
bookie.journals.get(0).getLastLogMark().markLog();
+
+        // keep write entries to l2 to trigger entry log rolling to checkpoint
+        int numEntries = 10;
+        final CountDownLatch writeL2Latch = new CountDownLatch(numEntries);
+        for (int i = 0; i < numEntries; i++) {
+            ByteBuf entry = createByteBuf(l2, i, entrySize);
+            bookie.addEntry(
+                entry,
+                (rc, ledgerId, entryId, addr, ctx) -> writeL2Latch.countDown(),
+                null,
+                new byte[0]);
+        }
+        writeL2Latch.await();
+
+        // wait until checkpoint to complete and journal marker is rolled.
+        bookie.syncThread.getExecutor().submit(() -> {}).get();
+
+        log.info("Wait until checkpoint is completed");
+
+        // the journal mark is rolled.
+        LastLogMark newLogMark = 
bookie.journals.get(0).getLastLogMark().markLog();
+        assertTrue(newLogMark.getCurMark().compare(logMark.getCurMark()) > 0);
+
+        // resume l1-writer to continue writing the entries
+        getLedgerDescWaitLatch.countDown();
+
+        // wait until the l1 entry is written
+        writeL1Latch.await();
+        t1.join();
+
+        // construct a new bookie to simulate "bookie restart from crash"
+        Bookie newBookie = new Bookie(conf);
+        newBookie.start();
+
+        for (int i = 0; i < numEntries; i++) {
+            ByteBuf entry = newBookie.readEntry(l2, i);
+            assertNotNull(entry);
+            assertEquals(l2, entry.readLong());
+            assertEquals((long) i, entry.readLong());
+            entry.release();
+        }
+
+        ByteBuf entry = newBookie.readEntry(l1, 0L);
+        assertNotNull(entry);
+        assertEquals(l1, entry.readLong());
+        assertEquals(0L, entry.readLong());
+        entry.release();
+        newBookie.shutdown();
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to