This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a226dbe BP-14 Implementation of WriteFlag.DEFERRED_SYNC on Journal
a226dbe is described below
commit a226dbedff03102291648df3a60e78a2c85ae65f
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Feb 1 11:21:20 2018 -0800
BP-14 Implementation of WriteFlag.DEFERRED_SYNC on Journal
Implement WriteFlags.DERERRED_SYNC on Bookie side.
In case of DEFERRED_SYNC write Journal will early acknowledge the write,
just after flushing the buffer to the OS cache, but before waiting for an fsync.
Author: Enrico Olivelli <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #889 from eolivelli/bp14-writeflags-journal
---
.../java/org/apache/bookkeeper/bookie/Bookie.java | 17 +-
.../java/org/apache/bookkeeper/bookie/Journal.java | 77 +++--
.../bookkeeper/bookie/LedgerDescriptorImpl.java | 2 +-
.../bookkeeper/proto/WriteEntryProcessor.java | 2 +-
.../bookkeeper/proto/WriteEntryProcessorV3.java | 11 +-
.../bookkeeper/bookie/BookieJournalForceTest.java | 317 +++++++++++++++++++++
.../bookie/BookieWriteToJournalTest.java | 130 +++++++++
.../apache/bookkeeper/bookie/LedgerCacheTest.java | 9 +-
.../bookkeeper/client/BookKeeperCloseTest.java | 4 +-
.../apache/bookkeeper/client/LedgerCloseTest.java | 4 +-
.../bookkeeper/client/LedgerRecoveryTest.java | 2 +-
.../client/ParallelLedgerRecoveryTest.java | 6 +-
.../replication/AuditorPeriodicCheckTest.java | 2 +-
.../bookkeeper/test/ConcurrentLedgerTest.java | 2 +-
14 files changed, 542 insertions(+), 43 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index da0525a..e091199 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -939,7 +939,7 @@ public class Bookie extends BookieCriticalThread {
// wait until journal quits
for (Journal journal: journals) {
- journal.join();
+ journal.joinThread();
}
LOG.info("Journal thread(s) quit.");
} catch (InterruptedException ie) {
@@ -1070,7 +1070,7 @@ public class Bookie extends BookieCriticalThread {
bb.put(masterKey);
bb.flip();
- getJournal(ledgerId).logAddEntry(bb, new NopWriteCallback(),
null);
+ getJournal(ledgerId).logAddEntry(bb, false /* ackBeforeSync
*/, new NopWriteCallback(), null);
}
}
@@ -1084,7 +1084,8 @@ public class Bookie extends BookieCriticalThread {
/**
* Add an entry to a ledger as specified by handle.
*/
- private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
WriteCallback cb, Object ctx)
+ private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
+ boolean ackBeforeSync, WriteCallback cb,
Object ctx)
throws IOException, BookieException {
long ledgerId = handle.getLedgerId();
long entryId = handle.addEntry(entry);
@@ -1094,7 +1095,7 @@ public class Bookie extends BookieCriticalThread {
if (LOG.isTraceEnabled()) {
LOG.trace("Adding {}@{}", entryId, ledgerId);
}
- getJournal(ledgerId).logAddEntry(entry, cb, ctx);
+ getJournal(ledgerId).logAddEntry(entry, ackBeforeSync, cb, ctx);
}
/**
@@ -1112,7 +1113,7 @@ public class Bookie extends BookieCriticalThread {
LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
synchronized (handle) {
entrySize = entry.readableBytes();
- addEntryInternal(handle, entry, cb, ctx);
+ addEntryInternal(handle, entry, false /* ackBeforeSync */, cb,
ctx);
}
success = true;
} catch (NoWritableLedgerDirException e) {
@@ -1159,7 +1160,7 @@ public class Bookie extends BookieCriticalThread {
* Add entry to a ledger.
* @throws BookieException.LedgerFencedException if the ledger is fenced
*/
- public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[]
masterKey)
+ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback
cb, Object ctx, byte[] masterKey)
throws IOException, BookieException.LedgerFencedException,
BookieException {
long requestNanos = MathUtils.nowInNano();
boolean success = false;
@@ -1172,7 +1173,7 @@ public class Bookie extends BookieCriticalThread {
.create(BookieException.Code.LedgerFencedException);
}
entrySize = entry.readableBytes();
- addEntryInternal(handle, entry, cb, ctx);
+ addEntryInternal(handle, entry, ackBeforeSync, cb, ctx);
}
success = true;
} catch (NoWritableLedgerDirException e) {
@@ -1403,7 +1404,7 @@ public class Bookie extends BookieCriticalThread {
buff.writeLong(1);
buff.writeLong(i);
cb.incCount();
- b.addEntry(buff, cb, null, new byte[0]);
+ b.addEntry(buff, false /* ackBeforeSync */, cb, null, new byte[0]);
}
cb.waitZero();
long end = MathUtils.now();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 29373d2..a2acf76 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -21,6 +21,7 @@
package org.apache.bookkeeper.bookie;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import io.netty.buffer.ByteBuf;
@@ -283,17 +284,18 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
WriteCallback cb;
Object ctx;
long enqueueTime;
+ boolean ackBeforeSync;
OpStatsLogger journalAddEntryStats;
Counter journalCbQueueSize;
- static QueueEntry create(
- ByteBuf entry, long ledgerId, long entryId, WriteCallback cb,
Object ctx,
- long enqueueTime,
- OpStatsLogger journalAddEntryStats,
+
+ static QueueEntry create(ByteBuf entry, boolean ackBeforeSync, long
ledgerId, long entryId,
+ WriteCallback cb, Object ctx, long enqueueTime, OpStatsLogger
journalAddEntryStats,
Counter journalCbQueueSize) {
QueueEntry qe = RECYCLER.get();
qe.entry = entry;
+ qe.ackBeforeSync = ackBeforeSync;
qe.cb = cb;
qe.ctx = ctx;
qe.ledgerId = ledgerId;
@@ -332,7 +334,11 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
}
}
- private class ForceWriteRequest {
+ /**
+ * Token which represents the need to force a write to the Journal.
+ */
+ @VisibleForTesting
+ public class ForceWriteRequest {
private JournalChannel logFile;
private RecyclableArrayList<QueueEntry> forceWriteWaiters;
private boolean shouldClose;
@@ -352,11 +358,16 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
this.logFile.forceWrite(false);
journalSyncStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime),
TimeUnit.NANOSECONDS);
}
+ LOG.info("setCurLogMark at {}", lastLogMark.getCurMark());
+ LOG.info("setCurLogMark at {}, {}", this.logId,
this.lastFlushedPosition);
lastLogMark.setCurLogMark(this.logId,
this.lastFlushedPosition);
// Notify the waiters that the force write succeeded
for (int i = 0; i < forceWriteWaiters.size(); i++) {
- cbThreadPool.execute(forceWriteWaiters.get(i));
+ QueueEntry qe = forceWriteWaiters.get(i);
+ if (qe != null) {
+ cbThreadPool.execute(qe);
+ }
journalCbQueueSize.inc();
}
@@ -819,22 +830,28 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
}
}
- public void logAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx) {
- logAddEntry(Unpooled.wrappedBuffer(entry), cb, ctx);
+ public void logAddEntry(ByteBuffer entry, boolean ackBeforeSync,
WriteCallback cb, Object ctx) {
+ logAddEntry(Unpooled.wrappedBuffer(entry), ackBeforeSync, cb, ctx);
}
/**
* record an add entry operation in journal.
*/
- public void logAddEntry(ByteBuf entry, WriteCallback cb, Object ctx) {
+ public void logAddEntry(ByteBuf entry, boolean ackBeforeSync,
WriteCallback cb, Object ctx) {
long ledgerId = entry.getLong(entry.readerIndex() + 0);
long entryId = entry.getLong(entry.readerIndex() + 8);
journalQueueSize.inc();
+ logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx);
+ }
+ @VisibleForTesting
+ void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
+ boolean ackBeforeSync, WriteCallback cb, Object ctx) {
//Retain entry until it gets written to journal
entry.retain();
+
queue.add(QueueEntry.create(
- entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
+ entry, ackBeforeSync, ledgerId, entryId, cb, ctx,
MathUtils.nowInNano(),
journalAddEntryStats, journalQueueSize));
}
@@ -867,6 +884,7 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
LOG.info("Starting journal on {}", journalDirectory);
RecyclableArrayList<QueueEntry> toFlush =
entryListRecycler.newInstance();
+ int numEntriesToFlush = 0;
ByteBuf lenBuff = Unpooled.buffer(4);
ByteBuf paddingBuff = Unpooled.buffer(2 *
conf.getJournalAlignmentSize());
paddingBuff.writeZero(paddingBuff.capacity());
@@ -912,7 +930,7 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
TimeUnit.NANOSECONDS);
}
- if (toFlush.isEmpty()) {
+ if (numEntriesToFlush == 0) {
qe = queue.take();
dequeueStartTime = MathUtils.nowInNano();
journalQueueStats.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime),
@@ -970,6 +988,16 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
}
journalFlushWatcher.reset().start();
bc.flush(false);
+
+ for (int i = 0; i < toFlush.size(); i++) {
+ QueueEntry entry = toFlush.get(i);
+ if (entry != null && (!syncData ||
entry.ackBeforeSync)) {
+ toFlush.set(i, null);
+ numEntriesToFlush--;
+ cbThreadPool.execute(entry);
+ }
+ }
+
lastFlushPosition = bc.position();
journalFlushStats.registerSuccessfulEvent(
journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
@@ -977,12 +1005,14 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
// Trace the lifetime of entries through
persistence
if (LOG.isDebugEnabled()) {
for (QueueEntry e : toFlush) {
- LOG.debug("Written and queuing for flush
Ledger: {} Entry: {}",
- e.ledgerId, e.entryId);
+ if (e != null) {
+ LOG.debug("Written and queuing for
flush Ledger: {} Entry: {}",
+ e.ledgerId, e.entryId);
+ }
}
}
-
forceWriteBatchEntriesStats.registerSuccessfulValue(toFlush.size());
+
forceWriteBatchEntriesStats.registerSuccessfulValue(numEntriesToFlush);
forceWriteBatchBytesStats.registerSuccessfulValue(batchSize);
boolean shouldRolloverJournal = (lastFlushPosition
> maxJournalSize);
@@ -992,14 +1022,12 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
forceWriteRequests.put(createForceWriteRequest(logFile, logId,
lastFlushPosition,
toFlush, shouldRolloverJournal, false));
toFlush = entryListRecycler.newInstance();
+ numEntriesToFlush = 0;
} else {
// Data is already written on the file (though
it might still be in the OS page-cache)
lastLogMark.setCurLogMark(logId,
lastFlushPosition);
- for (int i = 0; i < toFlush.size(); i++) {
- cbThreadPool.execute(toFlush.get(i));
- }
-
toFlush.clear();
+ numEntriesToFlush = 0;
if (shouldRolloverJournal) {
forceWriteRequests.put(
createForceWriteRequest(
@@ -1044,6 +1072,7 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
qe.entry.release();
toFlush.add(qe);
+ numEntriesToFlush++;
qe = null;
}
logFile.close();
@@ -1099,4 +1128,16 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
}
return total;
}
+
+ //
+ /**
+ * Wait for the Journal thread to exit.
+ * This is method is needed in order to mock the journal, we can't mock
final method of java.lang.Thread class
+ *
+ * @throws InterruptedException
+ */
+ @VisibleForTesting
+ public void joinThread() throws InterruptedException {
+ join();
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index 0649bc5..84730cb 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -122,7 +122,7 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
result = logFenceResult = SettableFuture.create();
}
ByteBuf entry = createLedgerFenceEntry(ledgerId);
- journal.logAddEntry(entry, (rc, ledgerId, entryId, addr, ctx) -> {
+ journal.logAddEntry(entry, false /* ackBeforeSync */, (rc, ledgerId,
entryId, addr, ctx) -> {
if (LOG.isDebugEnabled()) {
LOG.debug("Record fenced state for ledger {} in journal with
rc {}",
ledgerId, BKException.codeLogger(rc));
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index db2cd5e..34e2c2c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -77,7 +77,7 @@ class WriteEntryProcessor extends PacketProcessorBase
implements WriteCallback {
if (add.isRecoveryAdd()) {
requestProcessor.bookie.recoveryAddEntry(addData, this,
channel, add.getMasterKey());
} else {
- requestProcessor.bookie.addEntry(addData, this, channel,
add.getMasterKey());
+ requestProcessor.bookie.addEntry(addData, false, this,
channel, add.getMasterKey());
}
} catch (IOException e) {
LOG.error("Error writing " + add, e);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 2cb1238..f227d88 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -25,9 +25,11 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
@@ -101,6 +103,13 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
sendResponse(status, resp, requestProcessor.addRequestStats);
}
};
+ final EnumSet<WriteFlag> writeFlags;
+ if (addRequest.hasWriteFlags()) {
+ writeFlags = WriteFlag.getWriteFlags(addRequest.getWriteFlags());
+ } else {
+ writeFlags = EnumSet.noneOf(WriteFlag.class);
+ }
+ final boolean ackBeforeSync =
writeFlags.contains(WriteFlag.DEFERRED_SYNC);
StatusCode status = null;
byte[] masterKey = addRequest.getMasterKey().toByteArray();
ByteBuf entryToAdd =
Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer());
@@ -108,7 +117,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
if (addRequest.hasFlag() &&
addRequest.getFlag().equals(AddRequest.Flag.RECOVERY_ADD)) {
requestProcessor.bookie.recoveryAddEntry(entryToAdd, wcb,
channel, masterKey);
} else {
- requestProcessor.bookie.addEntry(entryToAdd, wcb, channel,
masterKey);
+ requestProcessor.bookie.addEntry(entryToAdd, ackBeforeSync,
wcb, channel, masterKey);
}
status = StatusCode.EOK;
} catch (IOException e) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
new file mode 100644
index 0000000..a55bef7
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
@@ -0,0 +1,317 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.Journal.ForceWriteRequest;
+import org.apache.bookkeeper.bookie.Journal.LastLogMark;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+/**
+ * Test the bookie journal.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({JournalChannel.class, Journal.class})
+@Slf4j
+public class BookieJournalForceTest {
+
+ private static final ByteBuf DATA = Unpooled.wrappedBuffer(new byte[]{});
+
+ @Rule
+ public TemporaryFolder tempDir = new TemporaryFolder();
+
+ @Test
+ public void testAckAfterSync() throws Exception {
+ File journalDir = tempDir.newFolder();
+ Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setJournalDirName(journalDir.getPath())
+ .setZkServers(null);
+
+ JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+ whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+ LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+ Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+
+ // machinery to suspend ForceWriteThread
+ CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
+ LinkedBlockingQueue<ForceWriteRequest> supportQueue =
+
enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
+
+ journal.start();
+
+ LogMark lastLogMarkBeforeWrite =
journal.getLastLogMark().markLog().getCurMark();
+ CountDownLatch latch = new CountDownLatch(1);
+ long ledgerId = 1;
+ long entryId = 0;
+ journal.logAddEntry(ledgerId, entryId, DATA, false /* ackBeforeSync
*/, new WriteCallback() {
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
+ latch.countDown();
+ }
+ }, null);
+
+ // logAddEntry should not complete even if ForceWriteThread is
suspended
+ // wait that an entry is written to the ForceWriteThread queue
+ while (supportQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+ assertEquals(1, latch.getCount());
+ assertEquals(1, supportQueue.size());
+
+ // in constructor of JournalChannel we are calling forceWrite(true)
but it is not tracked by PowerMock
+ // because the 'spy' is applied only on return from the constructor
+ verify(jc, times(0)).forceWrite(true);
+
+ // let ForceWriteThread work
+ forceWriteThreadSuspendedLatch.countDown();
+
+ // callback should complete now
+ assertTrue(latch.await(20, TimeUnit.SECONDS));
+
+ verify(jc, atLeast(1)).forceWrite(false);
+
+ assertEquals(0, supportQueue.size());
+
+ // verify that log marker advanced
+ LastLogMark lastLogMarkAfterForceWrite = journal.getLastLogMark();
+
assertTrue(lastLogMarkAfterForceWrite.getCurMark().compare(lastLogMarkBeforeWrite)
> 0);
+
+ journal.shutdown();
+ }
+
+ @Test
+ public void testAckBeforeSync() throws Exception {
+ File journalDir = tempDir.newFolder();
+ Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setJournalDirName(journalDir.getPath())
+ .setZkServers(null);
+
+ JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+ whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+ LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+ Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+
+ // machinery to suspend ForceWriteThread
+ CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
+ enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch,
journal);
+ journal.start();
+
+ LogMark lastLogMarkBeforeWrite =
journal.getLastLogMark().markLog().getCurMark();
+ CountDownLatch latch = new CountDownLatch(1);
+ long ledgerId = 1;
+ long entryId = 0;
+ journal.logAddEntry(ledgerId, entryId, DATA, true /* ackBeforeSync */,
new WriteCallback() {
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
+ latch.countDown();
+ }
+ }, null);
+ // logAddEntry should complete even if ForceWriteThread is suspended
+ latch.await(20, TimeUnit.SECONDS);
+
+ // in constructor of JournalChannel we are calling forceWrite(true)
but it is not tracked by PowerMock
+ // because the 'spy' is applied only on return from the constructor
+ verify(jc, times(0)).forceWrite(true);
+
+ // we are never calling forceWrite
+ verify(jc, times(0)).forceWrite(false);
+
+ // verify that log marker did not advance
+ LastLogMark lastLogMarkAfterForceWrite = journal.getLastLogMark();
+ assertEquals(0,
lastLogMarkAfterForceWrite.getCurMark().compare(lastLogMarkBeforeWrite));
+
+ // let the forceWriteThread exit
+ forceWriteThreadSuspendedLatch.countDown();
+
+ journal.shutdown();
+ }
+
+ @Test
+ public void testAckBeforeSyncWithJournalBufferedEntriesThreshold() throws
Exception {
+ File journalDir = tempDir.newFolder();
+ Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+ final int journalBufferedEntriesThreshold = 10;
+ // sending a burst of entries, more than
journalBufferedEntriesThreshold
+ final int numEntries = journalBufferedEntriesThreshold + 50;
+
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setJournalDirName(journalDir.getPath())
+
.setJournalBufferedEntriesThreshold(journalBufferedEntriesThreshold)
+ .setZkServers(null);
+
+ JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+ whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+ LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+ Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+
+ // machinery to suspend ForceWriteThread
+ CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
+ enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch,
journal);
+
+ TestStatsProvider testStatsProvider = new TestStatsProvider();
+ Counter flushMaxOutstandingBytesCounter =
testStatsProvider.getStatsLogger("test")
+
.getCounter("flushMaxOutstandingBytesCounter");
+ Whitebox.setInternalState(journal, "flushMaxOutstandingBytesCounter",
flushMaxOutstandingBytesCounter);
+
+ journal.start();
+
+ LogMark lastLogMarkBeforeWrite =
journal.getLastLogMark().markLog().getCurMark();
+ CountDownLatch latch = new CountDownLatch(numEntries);
+ long ledgerId = 1;
+ for (long entryId = 0; entryId < numEntries; entryId++) {
+ journal.logAddEntry(ledgerId, entryId, DATA, true /* ackBeforeSync
*/, new WriteCallback() {
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
+ latch.countDown();
+ }
+ }, null);
+ }
+
+ // logAddEntry should complete even if ForceWriteThread is suspended
+ latch.await(20, TimeUnit.SECONDS);
+
+ // in constructor of JournalChannel we are calling forceWrite(true)
but it is not tracked by PowerMock
+ // because the 'spy' is applied only on return from the constructor
+ verify(jc, times(0)).forceWrite(true);
+
+ // anyway we are never calling forceWrite
+ verify(jc, times(0)).forceWrite(false);
+
+ // verify that log marker did not advance
+ LastLogMark lastLogMarkAfterForceWrite = journal.getLastLogMark();
+ assertEquals(0,
lastLogMarkAfterForceWrite.getCurMark().compare(lastLogMarkBeforeWrite));
+
+ // let the forceWriteThread exit
+ forceWriteThreadSuspendedLatch.countDown();
+
+ assertTrue(flushMaxOutstandingBytesCounter.get() > 1);
+ journal.shutdown();
+ }
+
+ @Test
+ public void testInterleavedRequests() throws Exception {
+ File journalDir = tempDir.newFolder();
+ Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setJournalDirName(journalDir.getPath())
+ .setZkServers(null);
+
+ JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+ whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+ LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+ Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+ journal.start();
+
+ final int numEntries = 100;
+ CountDownLatch latchAckBeforeSynch = new CountDownLatch(numEntries);
+ CountDownLatch latchAckAfterSynch = new CountDownLatch(numEntries);
+
+ long ledgerIdAckBeforeSync = 1;
+ long ledgerIdAckAfterSync = 2;
+ for (long entryId = 0; entryId < numEntries; entryId++) {
+ journal.logAddEntry(ledgerIdAckBeforeSync, entryId, DATA, true,
new WriteCallback() {
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
+ latchAckBeforeSynch.countDown();
+ }
+ }, null);
+ journal.logAddEntry(ledgerIdAckAfterSync, entryId, DATA, false,
new WriteCallback() {
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
+ latchAckAfterSynch.countDown();
+ }
+ }, null);
+ }
+ assertTrue(latchAckBeforeSynch.await(20, TimeUnit.SECONDS));
+ assertTrue(latchAckAfterSynch.await(20, TimeUnit.SECONDS));
+
+ // in constructor of JournalChannel we are calling forceWrite(true)
but it is not tracked by PowerMock
+ // because the 'spy' is applied only on return from the constructor
+ verify(jc, times(0)).forceWrite(true);
+
+ verify(jc, atLeast(1)).forceWrite(false);
+
+ journal.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ private LinkedBlockingQueue<ForceWriteRequest>
enableForceWriteThreadSuspension(
+ CountDownLatch forceWriteThreadSuspendedLatch,
+ Journal journal) throws InterruptedException {
+ LinkedBlockingQueue<ForceWriteRequest> supportQueue = new
LinkedBlockingQueue<>();
+ BlockingQueue<ForceWriteRequest> forceWriteRequests =
mock(BlockingQueue.class);
+ doAnswer((Answer) (InvocationOnMock iom) -> {
+ supportQueue.put(iom.getArgument(0));
+ return null;
+ }).when(forceWriteRequests).put(any(ForceWriteRequest.class));
+ when(forceWriteRequests.take()).thenAnswer(i -> {
+ // suspend the force write thread
+ forceWriteThreadSuspendedLatch.await();
+ return supportQueue.take();
+ });
+ Whitebox.setInternalState(journal, "forceWriteRequests",
forceWriteRequests);
+ return supportQueue;
+ }
+
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
new file mode 100644
index 0000000..74e8447
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.BKException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test the bookie journal.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Bookie.class})
+@Slf4j
+public class BookieWriteToJournalTest {
+
+ @Rule
+ public TemporaryFolder tempDir = new TemporaryFolder();
+
+ /**
+ * test that Bookie calls correctly Journal.logAddEntry about
"ackBeforeSync" parameter.
+ */
+ @Test
+ public void testJournalLogAddEntryCalledCorrectly() throws Exception {
+
+ File journalDir = tempDir.newFolder();
+ Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+ File ledgerDir = tempDir.newFolder();
+ Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setJournalDirName(journalDir.getPath())
+ .setLedgerDirNames(new String[]{ledgerDir.getPath()})
+ .setZkServers(null);
+ BookieSocketAddress bookieAddress = Bookie.getBookieAddress(conf);
+ CountDownLatch journalJoinLatch = new CountDownLatch(1);
+ Journal journal = mock(Journal.class);
+ MutableBoolean effectiveAckBeforeSync = new MutableBoolean(false);
+ doAnswer((Answer) (InvocationOnMock iom) -> {
+ ByteBuf entry = iom.getArgument(0);
+ long ledgerId = entry.getLong(entry.readerIndex() + 0);
+ long entryId = entry.getLong(entry.readerIndex() + 8);
+ boolean ackBeforeSync = iom.getArgument(1);
+ WriteCallback callback = iom.getArgument(2);
+ Object ctx = iom.getArgument(3);
+
+ effectiveAckBeforeSync.setValue(ackBeforeSync);
+ callback.writeComplete(BKException.Code.OK, ledgerId, entryId,
bookieAddress, ctx);
+ return null;
+ }).when(journal).logAddEntry(any(ByteBuf.class), anyBoolean(),
any(WriteCallback.class), any());
+
+ // bookie will continue to work as soon as the journal thread is alive
+ doAnswer((Answer) (InvocationOnMock iom) -> {
+ journalJoinLatch.await();
+ return null;
+ }).when(journal).joinThread();
+
+ whenNew(Journal.class).withAnyArguments().thenReturn(journal);
+
+ Bookie b = new Bookie(conf);
+ b.start();
+
+ long ledgerId = 1;
+ long entryId = 0;
+ Object expectedCtx = "foo";
+ byte[] masterKey = new byte[64];
+ for (boolean ackBeforeSync : new boolean[]{true, false}) {
+ CountDownLatch latch = new CountDownLatch(1);
+ final ByteBuf data = Unpooled.buffer();
+ data.writeLong(ledgerId);
+ data.writeLong(entryId);
+ final long expectedEntryId = entryId;
+ b.addEntry(data, ackBeforeSync, (int rc, long ledgerId1, long
entryId1,
+ BookieSocketAddress addr, Object
ctx) -> {
+ assertSame(expectedCtx, ctx);
+ assertEquals(ledgerId, ledgerId1);
+ assertEquals(expectedEntryId, entryId1);
+ latch.countDown();
+ }, expectedCtx, masterKey);
+ latch.await(30, TimeUnit.SECONDS);
+ assertEquals(ackBeforeSync, effectiveAckBeforeSync.booleanValue());
+ entryId++;
+ }
+ // let bookie exit main thread
+ journalJoinLatch.countDown();
+ b.shutdown();
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 6ae9410..b628244 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -332,7 +332,7 @@ public class LedgerCacheTest {
b.start();
for (int i = 1; i <= numLedgers; i++) {
ByteBuf packet = generateEntry(i, 1);
- b.addEntry(packet, new Bookie.NopWriteCallback(), null,
"passwd".getBytes());
+ b.addEntry(packet, false, new Bookie.NopWriteCallback(), null,
"passwd".getBytes());
}
conf = TestBKConfiguration.newServerConfiguration();
@@ -539,7 +539,8 @@ public class LedgerCacheTest {
// this bookie.addEntry call is required. FileInfo for Ledger 1 would
be created with this call.
// without the fileinfo, 'flushTestSortedLedgerStorage.addEntry' calls
will fail
// because of BOOKKEEPER-965 change.
- bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(),
null, "passwd".getBytes());
+ bookie.addEntry(generateEntry(1, 1), false, new
Bookie.NopWriteCallback(), null, "passwd".getBytes());
+
flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
assertFalse("Bookie is expected to be in ReadWrite mode",
bookie.isReadOnly());
assertTrue("EntryMemTable SnapShot is expected to be empty",
memTable.snapshot.isEmpty());
@@ -583,7 +584,7 @@ public class LedgerCacheTest {
FlushTestSortedLedgerStorage flushTestSortedLedgerStorage =
(FlushTestSortedLedgerStorage) bookie.ledgerStorage;
EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
- bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(),
null, "passwd".getBytes());
+ bookie.addEntry(generateEntry(1, 1), false, new
Bookie.NopWriteCallback(), null, "passwd".getBytes());
flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
assertFalse("Bookie is expected to be in ReadWrite mode",
bookie.isReadOnly());
assertTrue("EntryMemTable SnapShot is expected to be empty",
memTable.snapshot.isEmpty());
@@ -598,7 +599,7 @@ public class LedgerCacheTest {
// after flush failure, the bookie is set to readOnly
assertTrue("Bookie is expected to be in Read mode",
bookie.isReadOnly());
// write fail
- bookie.addEntry(generateEntry(1, 3), new
BookkeeperInternalCallbacks.WriteCallback(){
+ bookie.addEntry(generateEntry(1, 3), false, new
BookkeeperInternalCallbacks.WriteCallback(){
public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx){
LOG.info("fail write to bk");
assertTrue(rc != OK);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
index 69f8f6c..850fe5d 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
@@ -89,7 +89,7 @@ public class BookKeeperCloseTest extends
BookKeeperClusterTestCase {
}
@Override
- public void addEntry(ByteBuf entry, WriteCallback cb,
+ public void addEntry(ByteBuf entry, boolean ackBeforeSync,
WriteCallback cb,
Object ctx, byte[] masterKey)
throws IOException, BookieException {
try {
@@ -99,7 +99,7 @@ public class BookKeeperCloseTest extends
BookKeeperClusterTestCase {
// and an exception would spam the logs
Thread.currentThread().interrupt();
}
- super.addEntry(entry, cb, ctx, masterKey);
+ super.addEntry(entry, ackBeforeSync, cb, ctx, masterKey);
}
@Override
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
index ad9c352..900a7c2 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
@@ -197,7 +197,7 @@ public class LedgerCloseTest extends
BookKeeperClusterTestCase {
throws Exception {
Bookie sBookie = new Bookie(conf) {
@Override
- public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx,
byte[] masterKey)
+ public void addEntry(ByteBuf entry, boolean ackBeforeSync,
WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
try {
latch.await();
@@ -221,7 +221,7 @@ public class LedgerCloseTest extends
BookKeeperClusterTestCase {
private void startDeadBookie(ServerConfiguration conf, final
CountDownLatch latch) throws Exception {
Bookie dBookie = new Bookie(conf) {
@Override
- public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx,
byte[] masterKey)
+ public void addEntry(ByteBuf entry, boolean ackBeforeSync,
WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
try {
latch.await();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
index cf205c1..1091475 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
@@ -193,7 +193,7 @@ public class LedgerRecoveryTest extends
BookKeeperClusterTestCase {
Bookie fakeBookie = new Bookie(conf) {
@Override
- public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx,
byte[] masterKey)
+ public void addEntry(ByteBuf entry, boolean ackBeforeSync,
WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
// drop request to simulate a slow and failed bookie
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 085a248..8e87921 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -455,9 +455,9 @@ public class ParallelLedgerRecoveryTest extends
BookKeeperClusterTestCase {
}
@Override
- public void addEntry(ByteBuf entry, final WriteCallback cb, Object
ctx, byte[] masterKey)
- throws IOException, BookieException {
- super.addEntry(entry, new WriteCallback() {
+ public void addEntry(ByteBuf entry, boolean ackBeforeSync, final
WriteCallback cb,
+ Object ctx, byte[] masterKey) throws IOException,
BookieException {
+ super.addEntry(entry, ackBeforeSync, new WriteCallback() {
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object
ctx) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index a3b0b0d..35eb958 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -376,7 +376,7 @@ public class AuditorPeriodicCheckTest extends
BookKeeperClusterTestCase {
ServerConfiguration conf = killBookie(bookieIdx);
Bookie writeFailingBookie = new Bookie(conf) {
@Override
- public void addEntry(ByteBuf entry, WriteCallback cb,
+ public void addEntry(ByteBuf entry, boolean ackBeforeSync,
WriteCallback cb,
Object ctx, byte[] masterKey)
throws IOException, BookieException {
try {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
index 6e55139..b11d336 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
@@ -187,7 +187,7 @@ public class ConcurrentLedgerTest {
bytes.position(0);
bytes.limit(bytes.capacity());
throttle.acquire();
- bookie.addEntry(Unpooled.wrappedBuffer(bytes), cb, counter,
zeros);
+ bookie.addEntry(Unpooled.wrappedBuffer(bytes), false, cb,
counter, zeros);
}
}
long finish = System.currentTimeMillis();
--
To stop receiving notification emails like this one, please contact
[email protected].