This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bd4119  BP14 - forceLedger bookie side implementation
3bd4119 is described below

commit 3bd411910b3651e5dcb9df44d1f77cd6f6cd0ca6
Author: Enrico Olivelli <[email protected]>
AuthorDate: Tue May 8 17:21:41 2018 +0800

    BP14 - forceLedger bookie side implementation
    
    This is the bookie side implementation of the force() API as defined in 
BP-14, without the wire protocol handler.
    We will introduce a new RPC which allows the client to ensure that all data 
sent for a  given ledger is persisted durably to the bookie, this is useful for 
DEFERRED_SYNC writers. In this case we are simulating a regular sync'd write to 
the journal choosen for the ledger.
    
    Author: Enrico Olivelli <[email protected]>
    Author: eolivelli <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>, JV Jujjuri 
<[email protected]>
    
    This closes #1375 from eolivelli/bp14-simple-forceledger
---
 .../bookkeeper/bookie/BookKeeperServerStats.java   |  2 +
 .../java/org/apache/bookkeeper/bookie/Bookie.java  | 19 +++++
 .../java/org/apache/bookkeeper/bookie/Journal.java | 36 ++++++----
 .../bookkeeper/bookie/BookieJournalForceTest.java  | 59 ++++++++++++++++
 .../bookie/BookieWriteToJournalTest.java           | 82 ++++++++++++++++++++--
 5 files changed, 180 insertions(+), 18 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 8b1665e..d2ce94b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -70,6 +70,7 @@ public interface BookKeeperServerStats {
     String BOOKIE_ADD_ENTRY = "BOOKIE_ADD_ENTRY";
     String BOOKIE_RECOVERY_ADD_ENTRY = "BOOKIE_RECOVERY_ADD_ENTRY";
     String BOOKIE_READ_ENTRY = "BOOKIE_READ_ENTRY";
+    String BOOKIE_FORCE_LEDGER = "BOOKIE_FORCE_LEDGER";
     String BOOKIE_READ_LAST_CONFIRMED = "BOOKIE_READ_LAST_CONFIRMED";
     String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";
     String BOOKIE_READ_ENTRY_BYTES = "BOOKIE_READ_ENTRY_BYTES";
@@ -80,6 +81,7 @@ public interface BookKeeperServerStats {
 
     String JOURNAL_SCOPE = "journal";
     String JOURNAL_ADD_ENTRY = "JOURNAL_ADD_ENTRY";
+    String JOURNAL_FORCE_LEDGER = "JOURNAL_FORCE_LEDGER";
     String JOURNAL_SYNC = "JOURNAL_SYNC";
     String JOURNAL_MEM_ADD_ENTRY = "JOURNAL_MEM_ADD_ENTRY";
     String JOURNAL_PREALLOCATION = "JOURNAL_PREALLOCATION";
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index b391c53..76af6eb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.bookie;
 
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY_BYTES;
+import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_FORCE_LEDGER;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY_BYTES;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_RECOVERY_ADD_ENTRY;
@@ -119,6 +120,7 @@ public class Bookie extends BookieCriticalThread {
 
     static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
     static final long METAENTRY_ID_FENCE_KEY  = -0x2000;
+    static final long METAENTRY_ID_FORCE_LEDGER  = -0x4000;
 
     private final LedgerDirsManager ledgerDirsManager;
     private LedgerDirsManager indexDirsManager;
@@ -139,6 +141,7 @@ public class Bookie extends BookieCriticalThread {
     private final StatsLogger statsLogger;
     private final Counter writeBytes;
     private final Counter readBytes;
+    private final Counter forceLedgerOps;
     // Bookie Operation Latency Stats
     private final OpStatsLogger addEntryStats;
     private final OpStatsLogger recoveryAddEntryStats;
@@ -734,6 +737,7 @@ public class Bookie extends BookieCriticalThread {
         // Expose Stats
         writeBytes = statsLogger.getCounter(WRITE_BYTES);
         readBytes = statsLogger.getCounter(READ_BYTES);
+        forceLedgerOps = statsLogger.getCounter(BOOKIE_FORCE_LEDGER);
         addEntryStats = statsLogger.getOpStatsLogger(BOOKIE_ADD_ENTRY);
         recoveryAddEntryStats = 
statsLogger.getOpStatsLogger(BOOKIE_RECOVERY_ADD_ENTRY);
         readEntryStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY);
@@ -1202,6 +1206,21 @@ public class Bookie extends BookieCriticalThread {
     }
 
     /**
+     * Force sync given 'ledgerId' entries on the journal to the disk.
+     * It works like a regular addEntry with ackBeforeSync=false.
+     * This is useful for ledgers with DEFERRED_SYNC write flag.
+     */
+    public void forceLedger(long ledgerId, WriteCallback cb,
+                            Object ctx) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Forcing ledger {}", ledgerId);
+        }
+        Journal journal = getJournal(ledgerId);
+        journal.forceLedger(ledgerId, cb, ctx);
+        forceLedgerOps.inc();
+    }
+
+    /**
      * Add entry to a ledger.
      * @throws BookieException.LedgerFencedException if the ledger is fenced
      */
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 94aafad..71477c0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -595,6 +595,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
 
     // Expose Stats
     private final OpStatsLogger journalAddEntryStats;
+    private final OpStatsLogger journalForceLedgerStats;
     private final OpStatsLogger journalSyncStats;
     private final OpStatsLogger journalCreationStats;
     private final OpStatsLogger journalFlushStats;
@@ -656,6 +657,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
 
         // Expose Stats
         journalAddEntryStats = 
statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_ADD_ENTRY);
+        journalForceLedgerStats = 
statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_LEDGER);
         journalSyncStats = 
statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_SYNC);
         journalCreationStats = 
statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_CREATION_LATENCY);
         journalFlushStats = 
statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FLUSH_LATENCY);
@@ -869,6 +871,14 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                 journalAddEntryStats, journalQueueSize));
     }
 
+    void forceLedger(long ledgerId, WriteCallback cb, Object ctx) {
+        journalQueueSize.inc();
+        queue.add(QueueEntry.create(
+                null, false /* ackBeforeSync */,  ledgerId,
+                Bookie.METAENTRY_ID_FORCE_LEDGER, cb, ctx, 
MathUtils.nowInNano(),
+                journalForceLedgerStats, journalQueueSize));
+    }
+
     /**
      * Get the length of journal entries queue.
      *
@@ -1072,22 +1082,23 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                 if (qe == null) { // no more queue entry
                     continue;
                 }
+                if (qe.entryId != Bookie.METAENTRY_ID_FORCE_LEDGER) {
+                    int entrySize = qe.entry.readableBytes();
+                    journalWriteBytes.add(entrySize);
+                    journalQueueSize.dec();
 
-                int entrySize = qe.entry.readableBytes();
-                journalWriteBytes.add(entrySize);
-                journalQueueSize.dec();
+                    batchSize += (4 + entrySize);
 
-                batchSize += (4 + entrySize);
+                    lenBuff.clear();
+                    lenBuff.writeInt(entrySize);
 
-                lenBuff.clear();
-                lenBuff.writeInt(entrySize);
-
-                // preAlloc based on size
-                logFile.preAllocIfNeeded(4 + entrySize);
+                    // preAlloc based on size
+                    logFile.preAllocIfNeeded(4 + entrySize);
 
-                bc.write(lenBuff);
-                bc.write(qe.entry);
-                qe.entry.release();
+                    bc.write(lenBuff);
+                    bc.write(qe.entry);
+                    qe.entry.release();
+                }
 
                 toFlush.add(qe);
                 numEntriesToFlush++;
@@ -1159,4 +1170,5 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
     public void joinThread() throws InterruptedException {
         join();
     }
+
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
index 6714d9c..3a8ad86 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
@@ -314,4 +314,63 @@ public class BookieJournalForceTest {
         return supportQueue;
     }
 
+    @Test
+    public void testForceLedger() throws Exception {
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath());
+
+        JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+        whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+        LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+        Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
+
+        // machinery to suspend ForceWriteThread
+        CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
+        LinkedBlockingQueue<ForceWriteRequest> supportQueue =
+                
enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
+        journal.start();
+
+        LogMark lastLogMarkBeforeWrite = 
journal.getLastLogMark().markLog().getCurMark();
+        CountDownLatch latch = new CountDownLatch(1);
+        long ledgerId = 1;
+        journal.forceLedger(ledgerId, new WriteCallback() {
+            @Override
+            public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+                latch.countDown();
+            }
+        }, null);
+
+        // forceLedger should not complete even if ForceWriteThread is 
suspended
+        // wait that an entry is written to the ForceWriteThread queue
+        while (supportQueue.isEmpty()) {
+            Thread.sleep(100);
+        }
+        assertEquals(1, latch.getCount());
+        assertEquals(1, supportQueue.size());
+
+        // in constructor of JournalChannel we are calling forceWrite(true) 
but it is not tracked by PowerMock
+        // because the 'spy' is applied only on return from the constructor
+        verify(jc, times(0)).forceWrite(true);
+
+        // let ForceWriteThread work
+        forceWriteThreadSuspendedLatch.countDown();
+
+        // callback should complete now
+        assertTrue(latch.await(20, TimeUnit.SECONDS));
+
+        verify(jc, atLeast(1)).forceWrite(false);
+
+        assertEquals(0, supportQueue.size());
+
+        // verify that log marker advanced
+        LastLogMark lastLogMarkAfterForceWrite = journal.getLastLogMark();
+        
assertTrue(lastLogMarkAfterForceWrite.getCurMark().compare(lastLogMarkBeforeWrite)
 > 0);
+
+        journal.shutdown();
+    }
+
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
index b43df80..2d197d3 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.complete;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
 import static org.mockito.ArgumentMatchers.any;
@@ -31,6 +33,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.File;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
@@ -72,8 +75,8 @@ public class BookieWriteToJournalTest {
         Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
         ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
         conf.setJournalDirName(journalDir.getPath())
-            .setLedgerDirNames(new String[]{ledgerDir.getPath()})
-            .setMetadataServiceUri(null);
+                .setLedgerDirNames(new String[]{ledgerDir.getPath()})
+                .setMetadataServiceUri(null);
         BookieSocketAddress bookieAddress = Bookie.getBookieAddress(conf);
         CountDownLatch journalJoinLatch = new CountDownLatch(1);
         Journal journal = mock(Journal.class);
@@ -108,12 +111,10 @@ public class BookieWriteToJournalTest {
         byte[] masterKey = new byte[64];
         for (boolean ackBeforeSync : new boolean[]{true, false}) {
             CountDownLatch latch = new CountDownLatch(1);
-            final ByteBuf data = Unpooled.buffer();
-            data.writeLong(ledgerId);
-            data.writeLong(entryId);
+            final ByteBuf data = buildEntry(ledgerId, entryId, -1);
             final long expectedEntryId = entryId;
             b.addEntry(data, ackBeforeSync, (int rc, long ledgerId1, long 
entryId1,
-                                             BookieSocketAddress addr, Object 
ctx) -> {
+                    BookieSocketAddress addr, Object ctx) -> {
                 assertSame(expectedCtx, ctx);
                 assertEquals(ledgerId, ledgerId1);
                 assertEquals(expectedEntryId, entryId1);
@@ -127,4 +128,73 @@ public class BookieWriteToJournalTest {
         journalJoinLatch.countDown();
         b.shutdown();
     }
+
+    /**
+     * test that Bookie calls correctly Journal.forceLedger and is able to 
return the correct LastAddPersisted entry id.
+     */
+    @Test
+    public void testForceLedger() throws Exception {
+
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+        File ledgerDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+                .setLedgerDirNames(new String[]{ledgerDir.getPath()});
+
+        Bookie b = new Bookie(conf);
+        b.start();
+
+        long ledgerId = 1;
+        long entryId = 0;
+        Object expectedCtx = "foo";
+        byte[] masterKey = new byte[64];
+
+        CompletableFuture<Long> latchForceLedger1 = new CompletableFuture<>();
+        CompletableFuture<Long> latchForceLedger2 = new CompletableFuture<>();
+        CompletableFuture<Long> latchAddEntry = new CompletableFuture<>();
+        final ByteBuf data = buildEntry(ledgerId, entryId, -1);
+        final long expectedEntryId = entryId;
+        b.forceLedger(ledgerId, (int rc, long ledgerId1, long entryId1,
+                BookieSocketAddress addr, Object ctx) -> {
+            if (rc != BKException.Code.OK) {
+                
latchForceLedger1.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            complete(latchForceLedger1, null);
+        }, expectedCtx);
+        result(latchForceLedger1);
+
+        b.addEntry(data, true /* ackBeforesync */, (int rc, long ledgerId1, 
long entryId1,
+                        BookieSocketAddress addr, Object ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        
latchAddEntry.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                        return;
+                    }
+                    latchAddEntry.complete(entryId);
+                }, expectedCtx, masterKey);
+        assertEquals(expectedEntryId, result(latchAddEntry).longValue());
+
+        // issue a new "forceLedger"
+        b.forceLedger(ledgerId, (int rc, long ledgerId1, long entryId1,
+                BookieSocketAddress addr, Object ctx) -> {
+            if (rc != BKException.Code.OK) {
+                
latchForceLedger2.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+                return;
+            }
+            complete(latchForceLedger2, null);
+        }, expectedCtx);
+        result(latchForceLedger2);
+
+        b.shutdown();
+    }
+
+    private static ByteBuf buildEntry(long ledgerId, long entryId, long 
lastAddConfirmed) {
+        final ByteBuf data = Unpooled.buffer();
+        data.writeLong(ledgerId);
+        data.writeLong(entryId);
+        data.writeLong(lastAddConfirmed);
+        return data;
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to