This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 3bd4119 BP14 - forceLedger bookie side implementation
3bd4119 is described below
commit 3bd411910b3651e5dcb9df44d1f77cd6f6cd0ca6
Author: Enrico Olivelli <[email protected]>
AuthorDate: Tue May 8 17:21:41 2018 +0800
BP14 - forceLedger bookie side implementation
This is the bookie side implementation of the force() API as defined in
BP-14, without the wire protocol handler.
We will introduce a new RPC which allows the client to ensure that all data
sent for a given ledger is persisted durably to the bookie, this is useful for
DEFERRED_SYNC writers. In this case we are simulating a regular sync'd write to
the journal choosen for the ledger.
Author: Enrico Olivelli <[email protected]>
Author: eolivelli <[email protected]>
Reviewers: Sijie Guo <[email protected]>, JV Jujjuri
<[email protected]>
This closes #1375 from eolivelli/bp14-simple-forceledger
---
.../bookkeeper/bookie/BookKeeperServerStats.java | 2 +
.../java/org/apache/bookkeeper/bookie/Bookie.java | 19 +++++
.../java/org/apache/bookkeeper/bookie/Journal.java | 36 ++++++----
.../bookkeeper/bookie/BookieJournalForceTest.java | 59 ++++++++++++++++
.../bookie/BookieWriteToJournalTest.java | 82 ++++++++++++++++++++--
5 files changed, 180 insertions(+), 18 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 8b1665e..d2ce94b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -70,6 +70,7 @@ public interface BookKeeperServerStats {
String BOOKIE_ADD_ENTRY = "BOOKIE_ADD_ENTRY";
String BOOKIE_RECOVERY_ADD_ENTRY = "BOOKIE_RECOVERY_ADD_ENTRY";
String BOOKIE_READ_ENTRY = "BOOKIE_READ_ENTRY";
+ String BOOKIE_FORCE_LEDGER = "BOOKIE_FORCE_LEDGER";
String BOOKIE_READ_LAST_CONFIRMED = "BOOKIE_READ_LAST_CONFIRMED";
String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";
String BOOKIE_READ_ENTRY_BYTES = "BOOKIE_READ_ENTRY_BYTES";
@@ -80,6 +81,7 @@ public interface BookKeeperServerStats {
String JOURNAL_SCOPE = "journal";
String JOURNAL_ADD_ENTRY = "JOURNAL_ADD_ENTRY";
+ String JOURNAL_FORCE_LEDGER = "JOURNAL_FORCE_LEDGER";
String JOURNAL_SYNC = "JOURNAL_SYNC";
String JOURNAL_MEM_ADD_ENTRY = "JOURNAL_MEM_ADD_ENTRY";
String JOURNAL_PREALLOCATION = "JOURNAL_PREALLOCATION";
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index b391c53..76af6eb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.bookie;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY_BYTES;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_FORCE_LEDGER;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY_BYTES;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_RECOVERY_ADD_ENTRY;
@@ -119,6 +120,7 @@ public class Bookie extends BookieCriticalThread {
static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
static final long METAENTRY_ID_FENCE_KEY = -0x2000;
+ static final long METAENTRY_ID_FORCE_LEDGER = -0x4000;
private final LedgerDirsManager ledgerDirsManager;
private LedgerDirsManager indexDirsManager;
@@ -139,6 +141,7 @@ public class Bookie extends BookieCriticalThread {
private final StatsLogger statsLogger;
private final Counter writeBytes;
private final Counter readBytes;
+ private final Counter forceLedgerOps;
// Bookie Operation Latency Stats
private final OpStatsLogger addEntryStats;
private final OpStatsLogger recoveryAddEntryStats;
@@ -734,6 +737,7 @@ public class Bookie extends BookieCriticalThread {
// Expose Stats
writeBytes = statsLogger.getCounter(WRITE_BYTES);
readBytes = statsLogger.getCounter(READ_BYTES);
+ forceLedgerOps = statsLogger.getCounter(BOOKIE_FORCE_LEDGER);
addEntryStats = statsLogger.getOpStatsLogger(BOOKIE_ADD_ENTRY);
recoveryAddEntryStats =
statsLogger.getOpStatsLogger(BOOKIE_RECOVERY_ADD_ENTRY);
readEntryStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY);
@@ -1202,6 +1206,21 @@ public class Bookie extends BookieCriticalThread {
}
/**
+ * Force sync given 'ledgerId' entries on the journal to the disk.
+ * It works like a regular addEntry with ackBeforeSync=false.
+ * This is useful for ledgers with DEFERRED_SYNC write flag.
+ */
+ public void forceLedger(long ledgerId, WriteCallback cb,
+ Object ctx) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Forcing ledger {}", ledgerId);
+ }
+ Journal journal = getJournal(ledgerId);
+ journal.forceLedger(ledgerId, cb, ctx);
+ forceLedgerOps.inc();
+ }
+
+ /**
* Add entry to a ledger.
* @throws BookieException.LedgerFencedException if the ledger is fenced
*/
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 94aafad..71477c0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -595,6 +595,7 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
// Expose Stats
private final OpStatsLogger journalAddEntryStats;
+ private final OpStatsLogger journalForceLedgerStats;
private final OpStatsLogger journalSyncStats;
private final OpStatsLogger journalCreationStats;
private final OpStatsLogger journalFlushStats;
@@ -656,6 +657,7 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
// Expose Stats
journalAddEntryStats =
statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_ADD_ENTRY);
+ journalForceLedgerStats =
statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_LEDGER);
journalSyncStats =
statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_SYNC);
journalCreationStats =
statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_CREATION_LATENCY);
journalFlushStats =
statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FLUSH_LATENCY);
@@ -869,6 +871,14 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
journalAddEntryStats, journalQueueSize));
}
+ void forceLedger(long ledgerId, WriteCallback cb, Object ctx) {
+ journalQueueSize.inc();
+ queue.add(QueueEntry.create(
+ null, false /* ackBeforeSync */, ledgerId,
+ Bookie.METAENTRY_ID_FORCE_LEDGER, cb, ctx,
MathUtils.nowInNano(),
+ journalForceLedgerStats, journalQueueSize));
+ }
+
/**
* Get the length of journal entries queue.
*
@@ -1072,22 +1082,23 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
if (qe == null) { // no more queue entry
continue;
}
+ if (qe.entryId != Bookie.METAENTRY_ID_FORCE_LEDGER) {
+ int entrySize = qe.entry.readableBytes();
+ journalWriteBytes.add(entrySize);
+ journalQueueSize.dec();
- int entrySize = qe.entry.readableBytes();
- journalWriteBytes.add(entrySize);
- journalQueueSize.dec();
+ batchSize += (4 + entrySize);
- batchSize += (4 + entrySize);
+ lenBuff.clear();
+ lenBuff.writeInt(entrySize);
- lenBuff.clear();
- lenBuff.writeInt(entrySize);
-
- // preAlloc based on size
- logFile.preAllocIfNeeded(4 + entrySize);
+ // preAlloc based on size
+ logFile.preAllocIfNeeded(4 + entrySize);
- bc.write(lenBuff);
- bc.write(qe.entry);
- qe.entry.release();
+ bc.write(lenBuff);
+ bc.write(qe.entry);
+ qe.entry.release();
+ }
toFlush.add(qe);
numEntriesToFlush++;
@@ -1159,4 +1170,5 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
public void joinThread() throws InterruptedException {
join();
}
+
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
index 6714d9c..3a8ad86 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
@@ -314,4 +314,63 @@ public class BookieJournalForceTest {
return supportQueue;
}
+ @Test
+ public void testForceLedger() throws Exception {
+ File journalDir = tempDir.newFolder();
+ Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setJournalDirName(journalDir.getPath());
+
+ JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+ whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+ LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+ Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
+
+ // machinery to suspend ForceWriteThread
+ CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
+ LinkedBlockingQueue<ForceWriteRequest> supportQueue =
+
enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
+ journal.start();
+
+ LogMark lastLogMarkBeforeWrite =
journal.getLastLogMark().markLog().getCurMark();
+ CountDownLatch latch = new CountDownLatch(1);
+ long ledgerId = 1;
+ journal.forceLedger(ledgerId, new WriteCallback() {
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
+ latch.countDown();
+ }
+ }, null);
+
+ // forceLedger should not complete even if ForceWriteThread is
suspended
+ // wait that an entry is written to the ForceWriteThread queue
+ while (supportQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+ assertEquals(1, latch.getCount());
+ assertEquals(1, supportQueue.size());
+
+ // in constructor of JournalChannel we are calling forceWrite(true)
but it is not tracked by PowerMock
+ // because the 'spy' is applied only on return from the constructor
+ verify(jc, times(0)).forceWrite(true);
+
+ // let ForceWriteThread work
+ forceWriteThreadSuspendedLatch.countDown();
+
+ // callback should complete now
+ assertTrue(latch.await(20, TimeUnit.SECONDS));
+
+ verify(jc, atLeast(1)).forceWrite(false);
+
+ assertEquals(0, supportQueue.size());
+
+ // verify that log marker advanced
+ LastLogMark lastLogMarkAfterForceWrite = journal.getLastLogMark();
+
assertTrue(lastLogMarkAfterForceWrite.getCurMark().compare(lastLogMarkBeforeWrite)
> 0);
+
+ journal.shutdown();
+ }
+
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
index b43df80..2d197d3 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
@@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.bookie;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.complete;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
@@ -31,6 +33,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.File;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@@ -72,8 +75,8 @@ public class BookieWriteToJournalTest {
Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
- .setLedgerDirNames(new String[]{ledgerDir.getPath()})
- .setMetadataServiceUri(null);
+ .setLedgerDirNames(new String[]{ledgerDir.getPath()})
+ .setMetadataServiceUri(null);
BookieSocketAddress bookieAddress = Bookie.getBookieAddress(conf);
CountDownLatch journalJoinLatch = new CountDownLatch(1);
Journal journal = mock(Journal.class);
@@ -108,12 +111,10 @@ public class BookieWriteToJournalTest {
byte[] masterKey = new byte[64];
for (boolean ackBeforeSync : new boolean[]{true, false}) {
CountDownLatch latch = new CountDownLatch(1);
- final ByteBuf data = Unpooled.buffer();
- data.writeLong(ledgerId);
- data.writeLong(entryId);
+ final ByteBuf data = buildEntry(ledgerId, entryId, -1);
final long expectedEntryId = entryId;
b.addEntry(data, ackBeforeSync, (int rc, long ledgerId1, long
entryId1,
- BookieSocketAddress addr, Object
ctx) -> {
+ BookieSocketAddress addr, Object ctx) -> {
assertSame(expectedCtx, ctx);
assertEquals(ledgerId, ledgerId1);
assertEquals(expectedEntryId, entryId1);
@@ -127,4 +128,73 @@ public class BookieWriteToJournalTest {
journalJoinLatch.countDown();
b.shutdown();
}
+
+ /**
+ * test that Bookie calls correctly Journal.forceLedger and is able to
return the correct LastAddPersisted entry id.
+ */
+ @Test
+ public void testForceLedger() throws Exception {
+
+ File journalDir = tempDir.newFolder();
+ Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+ File ledgerDir = tempDir.newFolder();
+ Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setJournalDirName(journalDir.getPath())
+ .setLedgerDirNames(new String[]{ledgerDir.getPath()});
+
+ Bookie b = new Bookie(conf);
+ b.start();
+
+ long ledgerId = 1;
+ long entryId = 0;
+ Object expectedCtx = "foo";
+ byte[] masterKey = new byte[64];
+
+ CompletableFuture<Long> latchForceLedger1 = new CompletableFuture<>();
+ CompletableFuture<Long> latchForceLedger2 = new CompletableFuture<>();
+ CompletableFuture<Long> latchAddEntry = new CompletableFuture<>();
+ final ByteBuf data = buildEntry(ledgerId, entryId, -1);
+ final long expectedEntryId = entryId;
+ b.forceLedger(ledgerId, (int rc, long ledgerId1, long entryId1,
+ BookieSocketAddress addr, Object ctx) -> {
+ if (rc != BKException.Code.OK) {
+
latchForceLedger1.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+ return;
+ }
+ complete(latchForceLedger1, null);
+ }, expectedCtx);
+ result(latchForceLedger1);
+
+ b.addEntry(data, true /* ackBeforesync */, (int rc, long ledgerId1,
long entryId1,
+ BookieSocketAddress addr, Object ctx) -> {
+ if (rc != BKException.Code.OK) {
+
latchAddEntry.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+ return;
+ }
+ latchAddEntry.complete(entryId);
+ }, expectedCtx, masterKey);
+ assertEquals(expectedEntryId, result(latchAddEntry).longValue());
+
+ // issue a new "forceLedger"
+ b.forceLedger(ledgerId, (int rc, long ledgerId1, long entryId1,
+ BookieSocketAddress addr, Object ctx) -> {
+ if (rc != BKException.Code.OK) {
+
latchForceLedger2.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc));
+ return;
+ }
+ complete(latchForceLedger2, null);
+ }, expectedCtx);
+ result(latchForceLedger2);
+
+ b.shutdown();
+ }
+
+ private static ByteBuf buildEntry(long ledgerId, long entryId, long
lastAddConfirmed) {
+ final ByteBuf data = Unpooled.buffer();
+ data.writeLong(ledgerId);
+ data.writeLong(entryId);
+ data.writeLong(lastAddConfirmed);
+ return data;
+ }
}
--
To stop receiving notification emails like this one, please contact
[email protected].