This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new a226dbe  BP-14 Implementation of WriteFlag.DEFERRED_SYNC on Journal
a226dbe is described below

commit a226dbedff03102291648df3a60e78a2c85ae65f
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Feb 1 11:21:20 2018 -0800

    BP-14 Implementation of WriteFlag.DEFERRED_SYNC on Journal
    
    Implement WriteFlags.DERERRED_SYNC on Bookie side.
    In case of DEFERRED_SYNC write Journal will early acknowledge the write, 
just after flushing the buffer to the OS cache, but before waiting for an fsync.
    
    Author: Enrico Olivelli <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #889 from eolivelli/bp14-writeflags-journal
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |  17 +-
 .../java/org/apache/bookkeeper/bookie/Journal.java |  77 +++--
 .../bookkeeper/bookie/LedgerDescriptorImpl.java    |   2 +-
 .../bookkeeper/proto/WriteEntryProcessor.java      |   2 +-
 .../bookkeeper/proto/WriteEntryProcessorV3.java    |  11 +-
 .../bookkeeper/bookie/BookieJournalForceTest.java  | 317 +++++++++++++++++++++
 .../bookie/BookieWriteToJournalTest.java           | 130 +++++++++
 .../apache/bookkeeper/bookie/LedgerCacheTest.java  |   9 +-
 .../bookkeeper/client/BookKeeperCloseTest.java     |   4 +-
 .../apache/bookkeeper/client/LedgerCloseTest.java  |   4 +-
 .../bookkeeper/client/LedgerRecoveryTest.java      |   2 +-
 .../client/ParallelLedgerRecoveryTest.java         |   6 +-
 .../replication/AuditorPeriodicCheckTest.java      |   2 +-
 .../bookkeeper/test/ConcurrentLedgerTest.java      |   2 +-
 14 files changed, 542 insertions(+), 43 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index da0525a..e091199 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -939,7 +939,7 @@ public class Bookie extends BookieCriticalThread {
             // wait until journal quits
             for (Journal journal: journals) {
 
-                journal.join();
+                journal.joinThread();
             }
             LOG.info("Journal thread(s) quit.");
         } catch (InterruptedException ie) {
@@ -1070,7 +1070,7 @@ public class Bookie extends BookieCriticalThread {
                 bb.put(masterKey);
                 bb.flip();
 
-                getJournal(ledgerId).logAddEntry(bb, new NopWriteCallback(), 
null);
+                getJournal(ledgerId).logAddEntry(bb, false /* ackBeforeSync 
*/, new NopWriteCallback(), null);
             }
         }
 
@@ -1084,7 +1084,8 @@ public class Bookie extends BookieCriticalThread {
     /**
      * Add an entry to a ledger as specified by handle.
      */
-    private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, 
WriteCallback cb, Object ctx)
+    private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
+                                  boolean ackBeforeSync, WriteCallback cb, 
Object ctx)
             throws IOException, BookieException {
         long ledgerId = handle.getLedgerId();
         long entryId = handle.addEntry(entry);
@@ -1094,7 +1095,7 @@ public class Bookie extends BookieCriticalThread {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Adding {}@{}", entryId, ledgerId);
         }
-        getJournal(ledgerId).logAddEntry(entry, cb, ctx);
+        getJournal(ledgerId).logAddEntry(entry, ackBeforeSync, cb, ctx);
     }
 
     /**
@@ -1112,7 +1113,7 @@ public class Bookie extends BookieCriticalThread {
             LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
             synchronized (handle) {
                 entrySize = entry.readableBytes();
-                addEntryInternal(handle, entry, cb, ctx);
+                addEntryInternal(handle, entry, false /* ackBeforeSync */, cb, 
ctx);
             }
             success = true;
         } catch (NoWritableLedgerDirException e) {
@@ -1159,7 +1160,7 @@ public class Bookie extends BookieCriticalThread {
      * Add entry to a ledger.
      * @throws BookieException.LedgerFencedException if the ledger is fenced
      */
-    public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] 
masterKey)
+    public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback 
cb, Object ctx, byte[] masterKey)
             throws IOException, BookieException.LedgerFencedException, 
BookieException {
         long requestNanos = MathUtils.nowInNano();
         boolean success = false;
@@ -1172,7 +1173,7 @@ public class Bookie extends BookieCriticalThread {
                             
.create(BookieException.Code.LedgerFencedException);
                 }
                 entrySize = entry.readableBytes();
-                addEntryInternal(handle, entry, cb, ctx);
+                addEntryInternal(handle, entry, ackBeforeSync, cb, ctx);
             }
             success = true;
         } catch (NoWritableLedgerDirException e) {
@@ -1403,7 +1404,7 @@ public class Bookie extends BookieCriticalThread {
             buff.writeLong(1);
             buff.writeLong(i);
             cb.incCount();
-            b.addEntry(buff, cb, null, new byte[0]);
+            b.addEntry(buff, false /* ackBeforeSync */, cb, null, new byte[0]);
         }
         cb.waitZero();
         long end = MathUtils.now();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 29373d2..a2acf76 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -21,6 +21,7 @@
 
 package org.apache.bookkeeper.bookie;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 
 import io.netty.buffer.ByteBuf;
@@ -283,17 +284,18 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
         WriteCallback cb;
         Object ctx;
         long enqueueTime;
+        boolean ackBeforeSync;
 
         OpStatsLogger journalAddEntryStats;
         Counter journalCbQueueSize;
 
-        static QueueEntry create(
-                ByteBuf entry, long ledgerId, long entryId, WriteCallback cb, 
Object ctx,
-                long enqueueTime,
-                OpStatsLogger journalAddEntryStats,
+
+        static QueueEntry create(ByteBuf entry, boolean ackBeforeSync, long 
ledgerId, long entryId,
+                WriteCallback cb, Object ctx, long enqueueTime, OpStatsLogger 
journalAddEntryStats,
                 Counter journalCbQueueSize) {
             QueueEntry qe = RECYCLER.get();
             qe.entry = entry;
+            qe.ackBeforeSync = ackBeforeSync;
             qe.cb = cb;
             qe.ctx = ctx;
             qe.ledgerId = ledgerId;
@@ -332,7 +334,11 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
         }
     }
 
-    private class ForceWriteRequest {
+    /**
+     * Token which represents the need to force a write to the Journal.
+     */
+    @VisibleForTesting
+    public class ForceWriteRequest {
         private JournalChannel logFile;
         private RecyclableArrayList<QueueEntry> forceWriteWaiters;
         private boolean shouldClose;
@@ -352,11 +358,16 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                     this.logFile.forceWrite(false);
                     
journalSyncStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), 
TimeUnit.NANOSECONDS);
                 }
+                LOG.info("setCurLogMark at {}", lastLogMark.getCurMark());
+                LOG.info("setCurLogMark at {}, {}", this.logId, 
this.lastFlushedPosition);
                 lastLogMark.setCurLogMark(this.logId, 
this.lastFlushedPosition);
 
                 // Notify the waiters that the force write succeeded
                 for (int i = 0; i < forceWriteWaiters.size(); i++) {
-                    cbThreadPool.execute(forceWriteWaiters.get(i));
+                    QueueEntry qe = forceWriteWaiters.get(i);
+                    if (qe != null) {
+                        cbThreadPool.execute(qe);
+                    }
                     journalCbQueueSize.inc();
                 }
 
@@ -819,22 +830,28 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
         }
     }
 
-    public void logAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx) {
-        logAddEntry(Unpooled.wrappedBuffer(entry), cb, ctx);
+    public void logAddEntry(ByteBuffer entry, boolean ackBeforeSync, 
WriteCallback cb, Object ctx) {
+        logAddEntry(Unpooled.wrappedBuffer(entry), ackBeforeSync, cb, ctx);
     }
 
     /**
      * record an add entry operation in journal.
      */
-    public void logAddEntry(ByteBuf entry, WriteCallback cb, Object ctx) {
+    public void logAddEntry(ByteBuf entry, boolean ackBeforeSync, 
WriteCallback cb, Object ctx) {
         long ledgerId = entry.getLong(entry.readerIndex() + 0);
         long entryId = entry.getLong(entry.readerIndex() + 8);
         journalQueueSize.inc();
+        logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx);
+    }
 
+    @VisibleForTesting
+    void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
+                     boolean ackBeforeSync, WriteCallback cb, Object ctx) {
         //Retain entry until it gets written to journal
         entry.retain();
+
         queue.add(QueueEntry.create(
-                entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
+                entry, ackBeforeSync,  ledgerId, entryId, cb, ctx, 
MathUtils.nowInNano(),
                 journalAddEntryStats, journalQueueSize));
     }
 
@@ -867,6 +884,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
         LOG.info("Starting journal on {}", journalDirectory);
 
         RecyclableArrayList<QueueEntry> toFlush = 
entryListRecycler.newInstance();
+        int numEntriesToFlush = 0;
         ByteBuf lenBuff = Unpooled.buffer(4);
         ByteBuf paddingBuff = Unpooled.buffer(2 * 
conf.getJournalAlignmentSize());
         paddingBuff.writeZero(paddingBuff.capacity());
@@ -912,7 +930,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                                 TimeUnit.NANOSECONDS);
                     }
 
-                    if (toFlush.isEmpty()) {
+                    if (numEntriesToFlush == 0) {
                         qe = queue.take();
                         dequeueStartTime = MathUtils.nowInNano();
                         
journalQueueStats.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime),
@@ -970,6 +988,16 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                             }
                             journalFlushWatcher.reset().start();
                             bc.flush(false);
+
+                            for (int i = 0; i < toFlush.size(); i++) {
+                                QueueEntry entry = toFlush.get(i);
+                                if (entry != null && (!syncData || 
entry.ackBeforeSync)) {
+                                    toFlush.set(i, null);
+                                    numEntriesToFlush--;
+                                    cbThreadPool.execute(entry);
+                                }
+                            }
+
                             lastFlushPosition = bc.position();
                             journalFlushStats.registerSuccessfulEvent(
                                     
journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
@@ -977,12 +1005,14 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                             // Trace the lifetime of entries through 
persistence
                             if (LOG.isDebugEnabled()) {
                                 for (QueueEntry e : toFlush) {
-                                    LOG.debug("Written and queuing for flush 
Ledger: {}  Entry: {}",
-                                              e.ledgerId, e.entryId);
+                                    if (e != null) {
+                                        LOG.debug("Written and queuing for 
flush Ledger: {}  Entry: {}",
+                                                  e.ledgerId, e.entryId);
+                                    }
                                 }
                             }
 
-                            
forceWriteBatchEntriesStats.registerSuccessfulValue(toFlush.size());
+                            
forceWriteBatchEntriesStats.registerSuccessfulValue(numEntriesToFlush);
                             
forceWriteBatchBytesStats.registerSuccessfulValue(batchSize);
 
                             boolean shouldRolloverJournal = (lastFlushPosition 
> maxJournalSize);
@@ -992,14 +1022,12 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                                 
forceWriteRequests.put(createForceWriteRequest(logFile, logId, 
lastFlushPosition,
                                                                                
toFlush, shouldRolloverJournal, false));
                                 toFlush = entryListRecycler.newInstance();
+                                numEntriesToFlush = 0;
                             } else {
                                 // Data is already written on the file (though 
it might still be in the OS page-cache)
                                 lastLogMark.setCurLogMark(logId, 
lastFlushPosition);
-                                for (int i = 0; i < toFlush.size(); i++) {
-                                    cbThreadPool.execute(toFlush.get(i));
-                                }
-
                                 toFlush.clear();
+                                numEntriesToFlush = 0;
                                 if (shouldRolloverJournal) {
                                     forceWriteRequests.put(
                                             createForceWriteRequest(
@@ -1044,6 +1072,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                 qe.entry.release();
 
                 toFlush.add(qe);
+                numEntriesToFlush++;
                 qe = null;
             }
             logFile.close();
@@ -1099,4 +1128,16 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
         }
         return total;
     }
+
+    //
+    /**
+     * Wait for the Journal thread to exit.
+     * This is method is needed in order to mock the journal, we can't mock 
final method of java.lang.Thread class
+     *
+     * @throws InterruptedException
+     */
+    @VisibleForTesting
+    public void joinThread() throws InterruptedException {
+        join();
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index 0649bc5..84730cb 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -122,7 +122,7 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
             result = logFenceResult = SettableFuture.create();
         }
         ByteBuf entry = createLedgerFenceEntry(ledgerId);
-        journal.logAddEntry(entry, (rc, ledgerId, entryId, addr, ctx) -> {
+        journal.logAddEntry(entry, false /* ackBeforeSync */, (rc, ledgerId, 
entryId, addr, ctx) -> {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Record fenced state for ledger {} in journal with 
rc {}",
                         ledgerId, BKException.codeLogger(rc));
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index db2cd5e..34e2c2c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -77,7 +77,7 @@ class WriteEntryProcessor extends PacketProcessorBase 
implements WriteCallback {
             if (add.isRecoveryAdd()) {
                 requestProcessor.bookie.recoveryAddEntry(addData, this, 
channel, add.getMasterKey());
             } else {
-                requestProcessor.bookie.addEntry(addData, this, channel, 
add.getMasterKey());
+                requestProcessor.bookie.addEntry(addData, false, this, 
channel, add.getMasterKey());
             }
         } catch (IOException e) {
             LOG.error("Error writing " + add, e);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 2cb1238..f227d88 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -25,9 +25,11 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
@@ -101,6 +103,13 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
                 sendResponse(status, resp, requestProcessor.addRequestStats);
             }
         };
+        final EnumSet<WriteFlag> writeFlags;
+        if (addRequest.hasWriteFlags()) {
+            writeFlags = WriteFlag.getWriteFlags(addRequest.getWriteFlags());
+        } else {
+            writeFlags = EnumSet.noneOf(WriteFlag.class);
+        }
+        final boolean ackBeforeSync = 
writeFlags.contains(WriteFlag.DEFERRED_SYNC);
         StatusCode status = null;
         byte[] masterKey = addRequest.getMasterKey().toByteArray();
         ByteBuf entryToAdd = 
Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer());
@@ -108,7 +117,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
             if (addRequest.hasFlag() && 
addRequest.getFlag().equals(AddRequest.Flag.RECOVERY_ADD)) {
                 requestProcessor.bookie.recoveryAddEntry(entryToAdd, wcb, 
channel, masterKey);
             } else {
-                requestProcessor.bookie.addEntry(entryToAdd, wcb, channel, 
masterKey);
+                requestProcessor.bookie.addEntry(entryToAdd, ackBeforeSync, 
wcb, channel, masterKey);
             }
             status = StatusCode.EOK;
         } catch (IOException e) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
new file mode 100644
index 0000000..a55bef7
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
@@ -0,0 +1,317 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.Journal.ForceWriteRequest;
+import org.apache.bookkeeper.bookie.Journal.LastLogMark;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+/**
+ * Test the bookie journal.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({JournalChannel.class, Journal.class})
+@Slf4j
+public class BookieJournalForceTest {
+
+    private static final ByteBuf DATA = Unpooled.wrappedBuffer(new byte[]{});
+
+    @Rule
+    public TemporaryFolder tempDir = new TemporaryFolder();
+
+    @Test
+    public void testAckAfterSync() throws Exception {
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            .setZkServers(null);
+
+        JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+        whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+        LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+
+        // machinery to suspend ForceWriteThread
+        CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
+        LinkedBlockingQueue<ForceWriteRequest> supportQueue =
+                
enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
+
+        journal.start();
+
+        LogMark lastLogMarkBeforeWrite = 
journal.getLastLogMark().markLog().getCurMark();
+        CountDownLatch latch = new CountDownLatch(1);
+        long ledgerId = 1;
+        long entryId = 0;
+        journal.logAddEntry(ledgerId, entryId, DATA, false /* ackBeforeSync 
*/, new WriteCallback() {
+            @Override
+            public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+                latch.countDown();
+            }
+        }, null);
+
+        // logAddEntry should not complete even if ForceWriteThread is 
suspended
+        // wait that an entry is written to the ForceWriteThread queue
+        while (supportQueue.isEmpty()) {
+            Thread.sleep(100);
+        }
+        assertEquals(1, latch.getCount());
+        assertEquals(1, supportQueue.size());
+
+        // in constructor of JournalChannel we are calling forceWrite(true) 
but it is not tracked by PowerMock
+        // because the 'spy' is applied only on return from the constructor
+        verify(jc, times(0)).forceWrite(true);
+
+        // let ForceWriteThread work
+        forceWriteThreadSuspendedLatch.countDown();
+
+        // callback should complete now
+        assertTrue(latch.await(20, TimeUnit.SECONDS));
+
+        verify(jc, atLeast(1)).forceWrite(false);
+
+        assertEquals(0, supportQueue.size());
+
+        // verify that log marker advanced
+        LastLogMark lastLogMarkAfterForceWrite = journal.getLastLogMark();
+        
assertTrue(lastLogMarkAfterForceWrite.getCurMark().compare(lastLogMarkBeforeWrite)
 > 0);
+
+        journal.shutdown();
+    }
+
+    @Test
+    public void testAckBeforeSync() throws Exception {
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            .setZkServers(null);
+
+        JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+        whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+        LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+
+        // machinery to suspend ForceWriteThread
+        CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
+        enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, 
journal);
+        journal.start();
+
+        LogMark lastLogMarkBeforeWrite = 
journal.getLastLogMark().markLog().getCurMark();
+        CountDownLatch latch = new CountDownLatch(1);
+        long ledgerId = 1;
+        long entryId = 0;
+        journal.logAddEntry(ledgerId, entryId, DATA, true /* ackBeforeSync */, 
new WriteCallback() {
+            @Override
+            public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+                latch.countDown();
+            }
+        }, null);
+        // logAddEntry should complete even if ForceWriteThread is suspended
+        latch.await(20, TimeUnit.SECONDS);
+
+        // in constructor of JournalChannel we are calling forceWrite(true) 
but it is not tracked by PowerMock
+        // because the 'spy' is applied only on return from the constructor
+        verify(jc, times(0)).forceWrite(true);
+
+        // we are never calling forceWrite
+        verify(jc, times(0)).forceWrite(false);
+
+        // verify that log marker did not advance
+        LastLogMark lastLogMarkAfterForceWrite = journal.getLastLogMark();
+        assertEquals(0, 
lastLogMarkAfterForceWrite.getCurMark().compare(lastLogMarkBeforeWrite));
+
+        // let the forceWriteThread exit
+        forceWriteThreadSuspendedLatch.countDown();
+
+        journal.shutdown();
+    }
+
+    @Test
+    public void testAckBeforeSyncWithJournalBufferedEntriesThreshold() throws 
Exception {
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+        final int journalBufferedEntriesThreshold = 10;
+        // sending a burst of entries, more than 
journalBufferedEntriesThreshold
+        final int numEntries = journalBufferedEntriesThreshold + 50;
+
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            
.setJournalBufferedEntriesThreshold(journalBufferedEntriesThreshold)
+            .setZkServers(null);
+
+        JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+        whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+        LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+
+        // machinery to suspend ForceWriteThread
+        CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
+        enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, 
journal);
+
+        TestStatsProvider testStatsProvider = new TestStatsProvider();
+        Counter flushMaxOutstandingBytesCounter = 
testStatsProvider.getStatsLogger("test")
+                                                        
.getCounter("flushMaxOutstandingBytesCounter");
+        Whitebox.setInternalState(journal, "flushMaxOutstandingBytesCounter", 
flushMaxOutstandingBytesCounter);
+
+        journal.start();
+
+        LogMark lastLogMarkBeforeWrite = 
journal.getLastLogMark().markLog().getCurMark();
+        CountDownLatch latch = new CountDownLatch(numEntries);
+        long ledgerId = 1;
+        for (long entryId = 0; entryId < numEntries; entryId++) {
+            journal.logAddEntry(ledgerId, entryId, DATA, true /* ackBeforeSync 
*/, new WriteCallback() {
+                @Override
+                public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        }
+
+        // logAddEntry should complete even if ForceWriteThread is suspended
+        latch.await(20, TimeUnit.SECONDS);
+
+        // in constructor of JournalChannel we are calling forceWrite(true) 
but it is not tracked by PowerMock
+        // because the 'spy' is applied only on return from the constructor
+        verify(jc, times(0)).forceWrite(true);
+
+        // anyway we are never calling forceWrite
+        verify(jc, times(0)).forceWrite(false);
+
+        // verify that log marker did not advance
+        LastLogMark lastLogMarkAfterForceWrite = journal.getLastLogMark();
+        assertEquals(0, 
lastLogMarkAfterForceWrite.getCurMark().compare(lastLogMarkBeforeWrite));
+
+        // let the forceWriteThread exit
+        forceWriteThreadSuspendedLatch.countDown();
+
+        assertTrue(flushMaxOutstandingBytesCounter.get() > 1);
+        journal.shutdown();
+    }
+
+    @Test
+    public void testInterleavedRequests() throws Exception {
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            .setZkServers(null);
+
+        JournalChannel jc = spy(new JournalChannel(journalDir, 1));
+        whenNew(JournalChannel.class).withAnyArguments().thenReturn(jc);
+
+        LedgerDirsManager ledgerDirsManager = mock(LedgerDirsManager.class);
+        Journal journal = new Journal(journalDir, conf, ledgerDirsManager);
+        journal.start();
+
+        final int numEntries = 100;
+        CountDownLatch latchAckBeforeSynch = new CountDownLatch(numEntries);
+        CountDownLatch latchAckAfterSynch = new CountDownLatch(numEntries);
+
+        long ledgerIdAckBeforeSync = 1;
+        long ledgerIdAckAfterSync = 2;
+        for (long entryId = 0; entryId < numEntries; entryId++) {
+            journal.logAddEntry(ledgerIdAckBeforeSync, entryId, DATA, true, 
new WriteCallback() {
+                @Override
+                public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+                    latchAckBeforeSynch.countDown();
+                }
+            }, null);
+            journal.logAddEntry(ledgerIdAckAfterSync, entryId, DATA, false, 
new WriteCallback() {
+                @Override
+                public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
+                    latchAckAfterSynch.countDown();
+                }
+            }, null);
+        }
+        assertTrue(latchAckBeforeSynch.await(20, TimeUnit.SECONDS));
+        assertTrue(latchAckAfterSynch.await(20, TimeUnit.SECONDS));
+
+        // in constructor of JournalChannel we are calling forceWrite(true) 
but it is not tracked by PowerMock
+        // because the 'spy' is applied only on return from the constructor
+        verify(jc, times(0)).forceWrite(true);
+
+        verify(jc, atLeast(1)).forceWrite(false);
+
+        journal.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    private LinkedBlockingQueue<ForceWriteRequest> 
enableForceWriteThreadSuspension(
+        CountDownLatch forceWriteThreadSuspendedLatch,
+        Journal journal) throws InterruptedException {
+        LinkedBlockingQueue<ForceWriteRequest> supportQueue = new 
LinkedBlockingQueue<>();
+        BlockingQueue<ForceWriteRequest> forceWriteRequests = 
mock(BlockingQueue.class);
+        doAnswer((Answer) (InvocationOnMock iom) -> {
+            supportQueue.put(iom.getArgument(0));
+            return null;
+        }).when(forceWriteRequests).put(any(ForceWriteRequest.class));
+        when(forceWriteRequests.take()).thenAnswer(i -> {
+            // suspend the force write thread
+            forceWriteThreadSuspendedLatch.await();
+            return supportQueue.take();
+        });
+        Whitebox.setInternalState(journal, "forceWriteRequests", 
forceWriteRequests);
+        return supportQueue;
+    }
+
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
new file mode 100644
index 0000000..74e8447
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.BKException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test the bookie journal.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Bookie.class})
+@Slf4j
+public class BookieWriteToJournalTest {
+
+    @Rule
+    public TemporaryFolder tempDir = new TemporaryFolder();
+
+    /**
+     * test that Bookie calls correctly Journal.logAddEntry about 
"ackBeforeSync" parameter.
+     */
+    @Test
+    public void testJournalLogAddEntryCalledCorrectly() throws Exception {
+
+        File journalDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+        File ledgerDir = tempDir.newFolder();
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+            .setLedgerDirNames(new String[]{ledgerDir.getPath()})
+            .setZkServers(null);
+        BookieSocketAddress bookieAddress = Bookie.getBookieAddress(conf);
+        CountDownLatch journalJoinLatch = new CountDownLatch(1);
+        Journal journal = mock(Journal.class);
+        MutableBoolean effectiveAckBeforeSync = new MutableBoolean(false);
+        doAnswer((Answer) (InvocationOnMock iom) -> {
+            ByteBuf entry = iom.getArgument(0);
+            long ledgerId = entry.getLong(entry.readerIndex() + 0);
+            long entryId = entry.getLong(entry.readerIndex() + 8);
+            boolean ackBeforeSync = iom.getArgument(1);
+            WriteCallback callback = iom.getArgument(2);
+            Object ctx = iom.getArgument(3);
+
+            effectiveAckBeforeSync.setValue(ackBeforeSync);
+            callback.writeComplete(BKException.Code.OK, ledgerId, entryId, 
bookieAddress, ctx);
+            return null;
+        }).when(journal).logAddEntry(any(ByteBuf.class), anyBoolean(), 
any(WriteCallback.class), any());
+
+        // bookie will continue to work as soon as the journal thread is alive
+        doAnswer((Answer) (InvocationOnMock iom) -> {
+            journalJoinLatch.await();
+            return null;
+        }).when(journal).joinThread();
+
+        whenNew(Journal.class).withAnyArguments().thenReturn(journal);
+
+        Bookie b = new Bookie(conf);
+        b.start();
+
+        long ledgerId = 1;
+        long entryId = 0;
+        Object expectedCtx = "foo";
+        byte[] masterKey = new byte[64];
+        for (boolean ackBeforeSync : new boolean[]{true, false}) {
+            CountDownLatch latch = new CountDownLatch(1);
+            final ByteBuf data = Unpooled.buffer();
+            data.writeLong(ledgerId);
+            data.writeLong(entryId);
+            final long expectedEntryId = entryId;
+            b.addEntry(data, ackBeforeSync, (int rc, long ledgerId1, long 
entryId1,
+                                             BookieSocketAddress addr, Object 
ctx) -> {
+                assertSame(expectedCtx, ctx);
+                assertEquals(ledgerId, ledgerId1);
+                assertEquals(expectedEntryId, entryId1);
+                latch.countDown();
+            }, expectedCtx, masterKey);
+            latch.await(30, TimeUnit.SECONDS);
+            assertEquals(ackBeforeSync, effectiveAckBeforeSync.booleanValue());
+            entryId++;
+        }
+        // let bookie exit main thread
+        journalJoinLatch.countDown();
+        b.shutdown();
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 6ae9410..b628244 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -332,7 +332,7 @@ public class LedgerCacheTest {
         b.start();
         for (int i = 1; i <= numLedgers; i++) {
             ByteBuf packet = generateEntry(i, 1);
-            b.addEntry(packet, new Bookie.NopWriteCallback(), null, 
"passwd".getBytes());
+            b.addEntry(packet, false, new Bookie.NopWriteCallback(), null, 
"passwd".getBytes());
         }
 
         conf = TestBKConfiguration.newServerConfiguration();
@@ -539,7 +539,8 @@ public class LedgerCacheTest {
         // this bookie.addEntry call is required. FileInfo for Ledger 1 would 
be created with this call.
         // without the fileinfo, 'flushTestSortedLedgerStorage.addEntry' calls 
will fail
         // because of BOOKKEEPER-965 change.
-        bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(), 
null, "passwd".getBytes());
+        bookie.addEntry(generateEntry(1, 1), false, new 
Bookie.NopWriteCallback(), null, "passwd".getBytes());
+
         flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
         assertFalse("Bookie is expected to be in ReadWrite mode", 
bookie.isReadOnly());
         assertTrue("EntryMemTable SnapShot is expected to be empty", 
memTable.snapshot.isEmpty());
@@ -583,7 +584,7 @@ public class LedgerCacheTest {
         FlushTestSortedLedgerStorage flushTestSortedLedgerStorage = 
(FlushTestSortedLedgerStorage) bookie.ledgerStorage;
         EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
 
-        bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(), 
null, "passwd".getBytes());
+        bookie.addEntry(generateEntry(1, 1), false, new 
Bookie.NopWriteCallback(), null, "passwd".getBytes());
         flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
         assertFalse("Bookie is expected to be in ReadWrite mode", 
bookie.isReadOnly());
         assertTrue("EntryMemTable SnapShot is expected to be empty", 
memTable.snapshot.isEmpty());
@@ -598,7 +599,7 @@ public class LedgerCacheTest {
         // after flush failure, the bookie is set to readOnly
         assertTrue("Bookie is expected to be in Read mode", 
bookie.isReadOnly());
         // write fail
-        bookie.addEntry(generateEntry(1, 3), new 
BookkeeperInternalCallbacks.WriteCallback(){
+        bookie.addEntry(generateEntry(1, 3), false, new 
BookkeeperInternalCallbacks.WriteCallback(){
             public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx){
                 LOG.info("fail write to bk");
                 assertTrue(rc != OK);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
index 69f8f6c..850fe5d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
@@ -89,7 +89,7 @@ public class BookKeeperCloseTest extends 
BookKeeperClusterTestCase {
                 }
 
                 @Override
-                public void addEntry(ByteBuf entry, WriteCallback cb,
+                public void addEntry(ByteBuf entry, boolean ackBeforeSync, 
WriteCallback cb,
                                      Object ctx, byte[] masterKey)
                         throws IOException, BookieException {
                     try {
@@ -99,7 +99,7 @@ public class BookKeeperCloseTest extends 
BookKeeperClusterTestCase {
                         // and an exception would spam the logs
                         Thread.currentThread().interrupt();
                     }
-                    super.addEntry(entry, cb, ctx, masterKey);
+                    super.addEntry(entry, ackBeforeSync, cb, ctx, masterKey);
                 }
 
                 @Override
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
index ad9c352..900a7c2 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
@@ -197,7 +197,7 @@ public class LedgerCloseTest extends 
BookKeeperClusterTestCase {
             throws Exception {
         Bookie sBookie = new Bookie(conf) {
             @Override
-            public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, 
byte[] masterKey)
+            public void addEntry(ByteBuf entry, boolean ackBeforeSync, 
WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 try {
                     latch.await();
@@ -221,7 +221,7 @@ public class LedgerCloseTest extends 
BookKeeperClusterTestCase {
     private void startDeadBookie(ServerConfiguration conf, final 
CountDownLatch latch) throws Exception {
         Bookie dBookie = new Bookie(conf) {
             @Override
-            public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, 
byte[] masterKey)
+            public void addEntry(ByteBuf entry, boolean ackBeforeSync, 
WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 try {
                     latch.await();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
index cf205c1..1091475 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
@@ -193,7 +193,7 @@ public class LedgerRecoveryTest extends 
BookKeeperClusterTestCase {
 
         Bookie fakeBookie = new Bookie(conf) {
             @Override
-            public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, 
byte[] masterKey)
+            public void addEntry(ByteBuf entry, boolean ackBeforeSync, 
WriteCallback cb, Object ctx, byte[] masterKey)
                     throws IOException, BookieException {
                 // drop request to simulate a slow and failed bookie
             }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 085a248..8e87921 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -455,9 +455,9 @@ public class ParallelLedgerRecoveryTest extends 
BookKeeperClusterTestCase {
         }
 
         @Override
-        public void addEntry(ByteBuf entry, final WriteCallback cb, Object 
ctx, byte[] masterKey)
-                throws IOException, BookieException {
-            super.addEntry(entry, new WriteCallback() {
+        public void addEntry(ByteBuf entry, boolean ackBeforeSync, final 
WriteCallback cb,
+                             Object ctx, byte[] masterKey) throws IOException, 
BookieException {
+            super.addEntry(entry, ackBeforeSync, new WriteCallback() {
                 @Override
                 public void writeComplete(int rc, long ledgerId, long entryId,
                                           BookieSocketAddress addr, Object 
ctx) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index a3b0b0d..35eb958 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -376,7 +376,7 @@ public class AuditorPeriodicCheckTest extends 
BookKeeperClusterTestCase {
         ServerConfiguration conf = killBookie(bookieIdx);
         Bookie writeFailingBookie = new Bookie(conf) {
             @Override
-            public void addEntry(ByteBuf entry, WriteCallback cb,
+            public void addEntry(ByteBuf entry, boolean ackBeforeSync, 
WriteCallback cb,
                              Object ctx, byte[] masterKey)
                              throws IOException, BookieException {
                 try {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
index 6e55139..b11d336 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
@@ -187,7 +187,7 @@ public class ConcurrentLedgerTest {
                 bytes.position(0);
                 bytes.limit(bytes.capacity());
                 throttle.acquire();
-                bookie.addEntry(Unpooled.wrappedBuffer(bytes), cb, counter, 
zeros);
+                bookie.addEntry(Unpooled.wrappedBuffer(bytes), false, cb, 
counter, zeros);
             }
         }
         long finish = System.currentTimeMillis();

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to